BUG-1845: implement proper shutdown sequence
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataCommitCoordinatorImpl.java
index 3fde8d360f8af6df8cb0bcd705a9e3289d9fd35e..15d7b1d966e1a59e028616bb85f5670f5805860b 100644 (file)
@@ -6,13 +6,17 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
-
-import javax.annotation.concurrent.GuardedBy;
-
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -21,17 +25,6 @@ import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 /**
  *
  * Implementation of blocking three phase commit coordinator, which which
@@ -49,28 +42,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
-
-    /**
-     * Runs AND binary operation between all booleans in supplied iteration of booleans.
-     *
-     * This method will stop evaluating iterables if first found is false.
-     */
-    private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
-
-        @Override
-        public Boolean apply(final Iterable<Boolean> input) {
-            for(boolean value : input) {
-               if(!value) {
-                   return Boolean.FALSE;
-               }
-            }
-            return Boolean.TRUE;
-        }
-    };
-
-    private final ListeningExecutorService executor;
-
     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+    private final ListeningExecutorService executor;
 
     /**
      *
@@ -89,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     @Override
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
-        Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
         ListenableFuture<Void> commitFuture = null;
         try {
             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
-                    listener, commitStatsTracker));
+                    commitStatsTracker));
         } catch(RejectedExecutionException e) {
             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
                       executor, e);
@@ -107,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
         }
 
-        if (listener.isPresent()) {
-            Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
-        }
-
         return MappingCheckedFuture.create(commitFuture,
                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
@@ -153,45 +121,43 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
     }
 
     /**
-     *
      * Implementation of blocking three-phase commit-coordination tasks without
-     * support of cancelation.
-     *
+     * support of cancellation.
      */
-    private static class CommitCoordinationTask implements Callable<Void> {
-
+    private static final class CommitCoordinationTask implements Callable<Void> {
+        private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
         private final DurationStatsTracker commitStatTracker;
-
-        @GuardedBy("this")
-        private CommitPhase currentPhase;
+        private final int cohortSize;
+        private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final Optional<DOMDataCommitErrorListener> listener,
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
-            this.currentPhase = CommitPhase.SUBMITTED;
             this.commitStatTracker = commitStatTracker;
+            this.cohortSize = Iterables.size(cohorts);
         }
 
         @Override
         public Void call() throws TransactionCommitFailedException {
+            final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
 
-            long startTime = System.nanoTime();
             try {
                 canCommitBlocking();
                 preCommitBlocking();
                 commitBlocking();
                 return null;
             } catch (TransactionCommitFailedException e) {
-                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
-                abortBlocking(e);
+                final CommitPhase phase = currentPhase;
+                LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+                abortBlocking(e, phase);
                 throw e;
             } finally {
-                if(commitStatTracker != null) {
+                if (commitStatTracker != null) {
                     commitStatTracker.addDuration(System.nanoTime() - startTime);
                 }
             }
@@ -210,78 +176,63 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *
          */
         private void canCommitBlocking() throws TransactionCommitFailedException {
-            final Boolean canCommitResult = canCommitAll().checkedGet();
-            if (!canCommitResult) {
-                throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+            for (ListenableFuture<?> canCommit : canCommitAll()) {
+                try {
+                    final Boolean result = (Boolean)canCommit.get();
+                    if (result == null || !result) {
+                        throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+                }
             }
         }
 
         /**
          *
-         * Invokes preCommit on underlying cohorts and blocks till
-         * all results are returned.
+         * Invokes canCommit on underlying cohorts and returns composite future
+         * which will contains {@link Boolean#TRUE} only and only if
+         * all cohorts returned true.
          *
-         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
-         * state is not CAN_COMMIT
-         * throws IllegalStateException.
+         * Valid state transition is from SUBMITTED to CAN_COMMIT,
+         * if currentPhase is not SUBMITTED throws IllegalStateException.
          *
-         * @throws TransactionCommitFailedException
-         *             If one of cohorts failed preCommit
+         * @return List of all cohorts futures from can commit phase.
          *
          */
-        private void preCommitBlocking() throws TransactionCommitFailedException {
-            preCommitAll().checkedGet();
+        private ListenableFuture<?>[] canCommitAll() {
+            changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops[i++] = cohort.canCommit();
+            }
+            return ops;
         }
 
         /**
          *
-         * Invokes commit on underlying cohorts and blocks till
+         * Invokes preCommit on underlying cohorts and blocks till
          * all results are returned.
          *
-         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
-         * IllegalStateException.
+         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+         * state is not CAN_COMMIT
+         * throws IllegalStateException.
          *
          * @throws TransactionCommitFailedException
          *             If one of cohorts failed preCommit
          *
          */
-        private void commitBlocking() throws TransactionCommitFailedException {
-            commitAll().checkedGet();
-        }
-
-        /**
-         * Aborts transaction.
-         *
-         * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
-         * cohorts, blocks
-         * for all results. If any of the abort failed throws
-         * IllegalStateException,
-         * which will contains originalCause as suppressed Exception.
-         *
-         * If aborts we're successful throws supplied exception
-         *
-         * @param originalCause
-         *            Exception which should be used to fail transaction for
-         *            consumers of transaction
-         *            future and listeners of transaction failure.
-         * @throws TransactionCommitFailedException
-         *             on invocation of this method.
-         *             originalCa
-         * @throws IllegalStateException
-         *             if abort failed.
-         */
-        private void abortBlocking(final TransactionCommitFailedException originalCause)
-                throws TransactionCommitFailedException {
-            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
-            Exception cause = originalCause;
+        private void preCommitBlocking() throws TransactionCommitFailedException {
+            final ListenableFuture<?>[] preCommitFutures = preCommitAll();
             try {
-                abortAsyncAll().get();
+                for(ListenableFuture<?> future : preCommitFutures) {
+                    future.get();
+                }
             } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
-                cause = new IllegalStateException("Abort failed.", e);
-                cause.addSuppressed(e);
+                throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
             }
-            Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
         }
 
         /**
@@ -295,27 +246,41 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          * state is not CAN_COMMIT
          * throws IllegalStateException.
          *
-         * @return Future which will complete once all cohorts completed
-         *         preCommit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
+         * @return List of all cohorts futures from can commit phase.
          *
          */
-        private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
+        private ListenableFuture<?>[] preCommitAll() {
             changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.preCommit());
+                ops[i++] = cohort.preCommit();
+            }
+            return ops;
+        }
+
+        /**
+         *
+         * Invokes commit on underlying cohorts and blocks till
+         * all results are returned.
+         *
+         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+         * IllegalStateException.
+         *
+         * @throws TransactionCommitFailedException
+         *             If one of cohorts failed preCommit
+         *
+         */
+        private void commitBlocking() throws TransactionCommitFailedException {
+            final ListenableFuture<?>[] commitFutures = commitAll();
+            try {
+                for(ListenableFuture<?> future : commitFutures) {
+                    future.get();
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
             }
-            /*
-             * We are returing all futures as list, not only succeeded ones in
-             * order to fail composite future if any of them failed.
-             * See Futures.allAsList for this description.
-             */
-            @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return MappingCheckedFuture.create(compositeResult,
-                                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
         }
 
         /**
@@ -327,80 +292,80 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
          * IllegalStateException
          *
-         * @return Future which will complete once all cohorts completed
-         *         commit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
+         * @return List of all cohorts futures from can commit phase.
          *
          */
-        private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
+        private ListenableFuture<?>[] commitAll() {
             changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.commit());
+                ops[i++] = cohort.commit();
             }
-            /*
-             * We are returing all futures as list, not only succeeded ones in
-             * order to fail composite future if any of them failed.
-             * See Futures.allAsList for this description.
-             */
-            @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return MappingCheckedFuture.create(compositeResult,
-                                     TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+            return ops;
         }
 
         /**
+         * Aborts transaction.
          *
-         * Invokes canCommit on underlying cohorts and returns composite future
-         * which will contains {@link Boolean#TRUE} only and only if
-         * all cohorts returned true.
-         *
-         * Valid state transition is from SUBMITTED to CAN_COMMIT,
-         * if currentPhase is not SUBMITTED throws IllegalStateException.
+         * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+         * cohorts, blocks
+         * for all results. If any of the abort failed throws
+         * IllegalStateException,
+         * which will contains originalCause as suppressed Exception.
          *
-         * @return Future which will complete once all cohorts completed
-         *         preCommit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
+         * If aborts we're successful throws supplied exception
          *
+         * @param originalCause
+         *            Exception which should be used to fail transaction for
+         *            consumers of transaction
+         *            future and listeners of transaction failure.
+         * @param phase phase in which the problem ensued
+         * @throws TransactionCommitFailedException
+         *             on invocation of this method.
+         *             originalCa
+         * @throws IllegalStateException
+         *             if abort failed.
          */
-        private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
-            changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
-            Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                canCommitOperations.add(cohort.canCommit());
+        private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
+                throws TransactionCommitFailedException {
+            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
+            Exception cause = originalCause;
+            try {
+                abortAsyncAll(phase).get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+                cause = new IllegalStateException("Abort failed.", e);
+                cause.addSuppressed(e);
             }
-            ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
-            ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
-            return MappingCheckedFuture.create(allSuccessFuture,
-                                       TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
-
+            Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
         }
 
         /**
-         *
          * Invokes abort on underlying cohorts and returns future which
-         * completes
-         * once all abort on cohorts are completed.
+         * completes once all abort on cohorts are completed.
          *
+         * @param phase phase in which the problem ensued
          * @return Future which will complete once all cohorts completed
          *         abort.
-         *
          */
-        private ListenableFuture<Void> abortAsyncAll() {
-            changeStateFrom(currentPhase, CommitPhase.ABORT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+        private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
+            changeStateFrom(phase, CommitPhase.ABORT);
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.abort());
+                ops[i++] = cohort.abort();
             }
+
             /*
-             * We are returing all futures as list, not only succeeded ones in
+             * We are returning all futures as list, not only succeeded ones in
              * order to fail composite future if any of them failed.
              * See Futures.allAsList for this description.
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
             return compositeResult;
         }
 
@@ -423,14 +388,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          * @throws IllegalStateException
          *             If currentState of task does not match expected state
          */
-        private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
-            Preconditions.checkState(currentPhase.equals(currentExpected),
-                    "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
-                    currentPhase, newState);
-            LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
-            currentPhase = newState;
-        };
+        private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+            final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
+            Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
+                tx.getIdentifier(), currentExpected, currentPhase, newState);
 
+            LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
+        };
     }
 
 }