BUG-650: Split out CommitCoordinationTask
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / CommitCoordinationTask.java
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/CommitCoordinationTask.java
new file mode 100644 (file)
index 0000000..e0ac702
--- /dev/null
@@ -0,0 +1,274 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of blocking three-phase commit-coordination tasks without
+ * support of cancellation.
+ */
+final class CommitCoordinationTask implements Callable<Void> {
+    private static enum Phase {
+        canCommit,
+        preCommit,
+        doCommit,
+    };
+
+    private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
+    private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+    private final DurationStatisticsTracker commitStatTracker;
+    private final DOMDataWriteTransaction tx;
+    private final int cohortSize;
+
+    public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
+            final DurationStatisticsTracker commitStatTracker) {
+        this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
+        this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+        this.commitStatTracker = commitStatTracker;
+        this.cohortSize = Iterables.size(cohorts);
+    }
+
+    @Override
+    public Void call() throws TransactionCommitFailedException {
+        final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
+
+        Phase phase = Phase.canCommit;
+
+        try {
+            LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
+            canCommitBlocking();
+
+            phase = Phase.preCommit;
+            LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
+            preCommitBlocking();
+
+            phase = Phase.doCommit;
+            LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
+            commitBlocking();
+
+            LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
+            return null;
+        } catch (TransactionCommitFailedException e) {
+            LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+            abortBlocking(e);
+            throw e;
+        } finally {
+            if (commitStatTracker != null) {
+                commitStatTracker.addDuration(System.nanoTime() - startTime);
+            }
+        }
+    }
+
+    /**
+     *
+     * Invokes canCommit on underlying cohorts and blocks till
+     * all results are returned.
+     *
+     * Valid state transition is from SUBMITTED to CAN_COMMIT,
+     * if currentPhase is not SUBMITTED throws IllegalStateException.
+     *
+     * @throws TransactionCommitFailedException
+     *             If one of cohorts failed can Commit
+     *
+     */
+    private void canCommitBlocking() throws TransactionCommitFailedException {
+        for (ListenableFuture<?> canCommit : canCommitAll()) {
+            try {
+                final Boolean result = (Boolean)canCommit.get();
+                if (result == null || !result) {
+                    throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+            }
+        }
+    }
+
+    /**
+     *
+     * Invokes canCommit on underlying cohorts and returns composite future
+     * which will contains {@link Boolean#TRUE} only and only if
+     * all cohorts returned true.
+     *
+     * Valid state transition is from SUBMITTED to CAN_COMMIT,
+     * if currentPhase is not SUBMITTED throws IllegalStateException.
+     *
+     * @return List of all cohorts futures from can commit phase.
+     *
+     */
+    private ListenableFuture<?>[] canCommitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+        int i = 0;
+        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.canCommit();
+        }
+        return ops;
+    }
+
+    /**
+     *
+     * Invokes preCommit on underlying cohorts and blocks till
+     * all results are returned.
+     *
+     * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+     * state is not CAN_COMMIT
+     * throws IllegalStateException.
+     *
+     * @throws TransactionCommitFailedException
+     *             If one of cohorts failed preCommit
+     *
+     */
+    private void preCommitBlocking() throws TransactionCommitFailedException {
+        final ListenableFuture<?>[] preCommitFutures = preCommitAll();
+        try {
+            for(ListenableFuture<?> future : preCommitFutures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
+        }
+    }
+
+    /**
+     *
+     * Invokes preCommit on underlying cohorts and returns future
+     * which will complete once all preCommit on cohorts completed or
+     * failed.
+     *
+     *
+     * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+     * state is not CAN_COMMIT
+     * throws IllegalStateException.
+     *
+     * @return List of all cohorts futures from can commit phase.
+     *
+     */
+    private ListenableFuture<?>[] preCommitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+        int i = 0;
+        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.preCommit();
+        }
+        return ops;
+    }
+
+    /**
+     *
+     * Invokes commit on underlying cohorts and blocks till
+     * all results are returned.
+     *
+     * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+     * IllegalStateException.
+     *
+     * @throws TransactionCommitFailedException
+     *             If one of cohorts failed preCommit
+     *
+     */
+    private void commitBlocking() throws TransactionCommitFailedException {
+        final ListenableFuture<?>[] commitFutures = commitAll();
+        try {
+            for(ListenableFuture<?> future : commitFutures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
+        }
+    }
+
+    /**
+     *
+     * Invokes commit on underlying cohorts and returns future which
+     * completes
+     * once all commits on cohorts are completed.
+     *
+     * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+     * IllegalStateException
+     *
+     * @return List of all cohorts futures from can commit phase.
+     */
+    private ListenableFuture<?>[] commitAll() {
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+        int i = 0;
+        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.commit();
+        }
+        return ops;
+    }
+
+    /**
+     * Aborts transaction.
+     *
+     * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+     * cohorts, blocks
+     * for all results. If any of the abort failed throws
+     * IllegalStateException,
+     * which will contains originalCause as suppressed Exception.
+     *
+     * If aborts we're successful throws supplied exception
+     *
+     * @param originalCause
+     *            Exception which should be used to fail transaction for
+     *            consumers of transaction
+     *            future and listeners of transaction failure.
+     * @param phase phase in which the problem ensued
+     * @throws TransactionCommitFailedException
+     *             on invocation of this method.
+     *             originalCa
+     * @throws IllegalStateException
+     *             if abort failed.
+     */
+    private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
+        Exception cause = originalCause;
+        try {
+            abortAsyncAll().get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+            cause = new IllegalStateException("Abort failed.", e);
+            cause.addSuppressed(e);
+        }
+        Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
+    }
+
+    /**
+     * Invokes abort on underlying cohorts and returns future which
+     * completes once all abort on cohorts are completed.
+     *
+     * @return Future which will complete once all cohorts completed
+     *         abort.
+     */
+    private ListenableFuture<Void> abortAsyncAll() {
+
+        final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+        int i = 0;
+        for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+            ops[i++] = cohort.abort();
+        }
+
+        /*
+         * We are returning all futures as list, not only succeeded ones in
+         * order to fail composite future if any of them failed.
+         * See Futures.allAsList for this description.
+         */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
+        return compositeResult;
+    }
+}