Bug 650: Removed uncessary wrapping and allocation of futures 40/11040/3
authorRobert Varga <rovarga@cisco.com>
Thu, 11 Sep 2014 11:58:32 +0000 (13:58 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 11 Sep 2014 19:59:57 +0000 (21:59 +0200)
There is no need to use an intermediate checkfuture just to communicate
the list over, so let's get rid of the warpping. At the same time we
eliminate lists and opt for arrays. The sizing requirements for arrays
are determined precisely once, when a coordinator is allocated.

Change-Id: Id5c4c7852210b99c4da3e9e77a84c0d2d0fa622d
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java

index 3fde8d360f8af6df8cb0bcd705a9e3289d9fd35e..d796ab35fa88522bd7f89d4802adaffe9c3d0a8e 100644 (file)
@@ -6,13 +6,18 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
-
 import javax.annotation.concurrent.GuardedBy;
-
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -21,17 +26,6 @@ import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 /**
  *
  * Implementation of blocking three phase commit coordinator, which which
@@ -49,28 +43,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
-
-    /**
-     * Runs AND binary operation between all booleans in supplied iteration of booleans.
-     *
-     * This method will stop evaluating iterables if first found is false.
-     */
-    private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
-
-        @Override
-        public Boolean apply(final Iterable<Boolean> input) {
-            for(boolean value : input) {
-               if(!value) {
-                   return Boolean.FALSE;
-               }
-            }
-            return Boolean.TRUE;
-        }
-    };
-
-    private final ListeningExecutorService executor;
-
     private final DurationStatsTracker commitStatsTracker = new DurationStatsTracker();
+    private final ListeningExecutorService executor;
 
     /**
      *
@@ -163,6 +137,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         private final DOMDataWriteTransaction tx;
         private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
         private final DurationStatsTracker commitStatTracker;
+        private final int cohortSize;
 
         @GuardedBy("this")
         private CommitPhase currentPhase;
@@ -175,6 +150,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
             this.currentPhase = CommitPhase.SUBMITTED;
             this.commitStatTracker = commitStatTracker;
+            this.cohortSize = Iterables.size(cohorts);
         }
 
         @Override
@@ -210,10 +186,39 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *
          */
         private void canCommitBlocking() throws TransactionCommitFailedException {
-            final Boolean canCommitResult = canCommitAll().checkedGet();
-            if (!canCommitResult) {
-                throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+            for (ListenableFuture<?> canCommit : canCommitAll()) {
+                try {
+                    final Boolean result = (Boolean)canCommit.get();
+                    if (result == null || !result) {
+                        throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+                }
+            }
+        }
+
+        /**
+         *
+         * Invokes canCommit on underlying cohorts and returns composite future
+         * which will contains {@link Boolean#TRUE} only and only if
+         * all cohorts returned true.
+         *
+         * Valid state transition is from SUBMITTED to CAN_COMMIT,
+         * if currentPhase is not SUBMITTED throws IllegalStateException.
+         *
+         * @return List of all cohorts futures from can commit phase.
+         *
+         */
+        private ListenableFuture<?>[] canCommitAll() {
+            changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops[i++] = cohort.canCommit();
             }
+            return ops;
         }
 
         /**
@@ -230,7 +235,39 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *
          */
         private void preCommitBlocking() throws TransactionCommitFailedException {
-            preCommitAll().checkedGet();
+            final ListenableFuture<?>[] preCommitFutures = preCommitAll();
+            try {
+                for(ListenableFuture<?> future : preCommitFutures) {
+                    future.get();
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
+            }
+        }
+
+        /**
+         *
+         * Invokes preCommit on underlying cohorts and returns future
+         * which will complete once all preCommit on cohorts completed or
+         * failed.
+         *
+         *
+         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+         * state is not CAN_COMMIT
+         * throws IllegalStateException.
+         *
+         * @return List of all cohorts futures from can commit phase.
+         *
+         */
+        private ListenableFuture<?>[] preCommitAll() {
+            changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops[i++] = cohort.preCommit();
+            }
+            return ops;
         }
 
         /**
@@ -246,7 +283,37 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          *
          */
         private void commitBlocking() throws TransactionCommitFailedException {
-            commitAll().checkedGet();
+            final ListenableFuture<?>[] commitFutures = commitAll();
+            try {
+                for(ListenableFuture<?> future : commitFutures) {
+                    future.get();
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
+            }
+        }
+
+        /**
+         *
+         * Invokes commit on underlying cohorts and returns future which
+         * completes
+         * once all commits on cohorts are completed.
+         *
+         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+         * IllegalStateException
+         *
+         * @return List of all cohorts futures from can commit phase.
+         *
+         */
+        private ListenableFuture<?>[] commitAll() {
+            changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
+            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+                ops[i++] = cohort.commit();
+            }
+            return ops;
         }
 
         /**
@@ -284,100 +351,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
             Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
         }
 
-        /**
-         *
-         * Invokes preCommit on underlying cohorts and returns future
-         * which will complete once all preCommit on cohorts completed or
-         * failed.
-         *
-         *
-         * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
-         * state is not CAN_COMMIT
-         * throws IllegalStateException.
-         *
-         * @return Future which will complete once all cohorts completed
-         *         preCommit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
-         *
-         */
-        private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
-            changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.preCommit());
-            }
-            /*
-             * We are returing all futures as list, not only succeeded ones in
-             * order to fail composite future if any of them failed.
-             * See Futures.allAsList for this description.
-             */
-            @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return MappingCheckedFuture.create(compositeResult,
-                                         TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
-        }
-
-        /**
-         *
-         * Invokes commit on underlying cohorts and returns future which
-         * completes
-         * once all commits on cohorts are completed.
-         *
-         * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
-         * IllegalStateException
-         *
-         * @return Future which will complete once all cohorts completed
-         *         commit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
-         *
-         */
-        private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
-            changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.commit());
-            }
-            /*
-             * We are returing all futures as list, not only succeeded ones in
-             * order to fail composite future if any of them failed.
-             * See Futures.allAsList for this description.
-             */
-            @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
-            return MappingCheckedFuture.create(compositeResult,
-                                     TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
-        }
-
-        /**
-         *
-         * Invokes canCommit on underlying cohorts and returns composite future
-         * which will contains {@link Boolean#TRUE} only and only if
-         * all cohorts returned true.
-         *
-         * Valid state transition is from SUBMITTED to CAN_COMMIT,
-         * if currentPhase is not SUBMITTED throws IllegalStateException.
-         *
-         * @return Future which will complete once all cohorts completed
-         *         preCommit.
-         *         Future throws TransactionCommitFailedException
-         *         If any of cohorts failed preCommit
-         *
-         */
-        private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
-            changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
-            Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
-            for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                canCommitOperations.add(cohort.canCommit());
-            }
-            ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
-            ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
-            return MappingCheckedFuture.create(allSuccessFuture,
-                                       TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
-
-        }
-
         /**
          *
          * Invokes abort on underlying cohorts and returns future which
@@ -390,17 +363,20 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
          */
         private ListenableFuture<Void> abortAsyncAll() {
             changeStateFrom(currentPhase, CommitPhase.ABORT);
-            Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+
+            final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+            int i = 0;
             for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
-                ops.add(cohort.abort());
+                ops[i++] = cohort.abort();
             }
+
             /*
              * We are returing all futures as list, not only succeeded ones in
              * order to fail composite future if any of them failed.
              * See Futures.allAsList for this description.
              */
             @SuppressWarnings({ "unchecked", "rawtypes" })
-            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+            ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
             return compositeResult;
         }