X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataCommitCoordinatorImpl.java;h=15d7b1d966e1a59e028616bb85f5670f5805860b;hp=d796ab35fa88522bd7f89d4802adaffe9c3d0a8e;hb=b3e553ce5b3d3e972cbe19465ab7af2fcb39934c;hpb=3d218be158cc7e36503c75723e54d96203f9a420 diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index d796ab35fa..15d7b1d966 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -6,7 +6,6 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -17,7 +16,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -63,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { @Override public CheckedFuture submit(final DOMDataWriteTransaction transaction, - final Iterable cohorts, final Optional listener) { + final Iterable cohorts) { Preconditions.checkArgument(transaction != null, "Transaction must not be null."); Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); - Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); ListenableFuture commitFuture = null; try { commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, - listener, commitStatsTracker)); + commitStatsTracker)); } catch(RejectedExecutionException e) { LOG.error("The commit executor's queue is full - submit task was rejected. \n" + executor, e); @@ -81,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); } - if (listener.isPresent()) { - Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); - } - return MappingCheckedFuture.create(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } @@ -127,47 +121,43 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /** - * * Implementation of blocking three-phase commit-coordination tasks without - * support of cancelation. - * + * support of cancellation. */ - private static class CommitCoordinationTask implements Callable { - + private static final class CommitCoordinationTask implements Callable { + private static final AtomicReferenceFieldUpdater PHASE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase"); private final DOMDataWriteTransaction tx; private final Iterable cohorts; private final DurationStatsTracker commitStatTracker; private final int cohortSize; - - @GuardedBy("this") - private CommitPhase currentPhase; + private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED; public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, - final Optional listener, final DurationStatsTracker commitStatTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); - this.currentPhase = CommitPhase.SUBMITTED; this.commitStatTracker = commitStatTracker; this.cohortSize = Iterables.size(cohorts); } @Override public Void call() throws TransactionCommitFailedException { + final long startTime = commitStatTracker != null ? System.nanoTime() : 0; - long startTime = System.nanoTime(); try { canCommitBlocking(); preCommitBlocking(); commitBlocking(); return null; } catch (TransactionCommitFailedException e) { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); - abortBlocking(e); + final CommitPhase phase = currentPhase; + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e); + abortBlocking(e, phase); throw e; } finally { - if(commitStatTracker != null) { + if (commitStatTracker != null) { commitStatTracker.addDuration(System.nanoTime() - startTime); } } @@ -331,18 +321,19 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * Exception which should be used to fail transaction for * consumers of transaction * future and listeners of transaction failure. + * @param phase phase in which the problem ensued * @throws TransactionCommitFailedException * on invocation of this method. * originalCa * @throws IllegalStateException * if abort failed. */ - private void abortBlocking(final TransactionCommitFailedException originalCause) + private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase) throws TransactionCommitFailedException { - LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause); + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause); Exception cause = originalCause; try { - abortAsyncAll().get(); + abortAsyncAll(phase).get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); cause = new IllegalStateException("Abort failed.", e); @@ -352,17 +343,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /** - * * Invokes abort on underlying cohorts and returns future which - * completes - * once all abort on cohorts are completed. + * completes once all abort on cohorts are completed. * + * @param phase phase in which the problem ensued * @return Future which will complete once all cohorts completed * abort. - * */ - private ListenableFuture abortAsyncAll() { - changeStateFrom(currentPhase, CommitPhase.ABORT); + private ListenableFuture abortAsyncAll(final CommitPhase phase) { + changeStateFrom(phase, CommitPhase.ABORT); final ListenableFuture[] ops = new ListenableFuture[cohortSize]; int i = 0; @@ -371,7 +360,7 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { } /* - * We are returing all futures as list, not only succeeded ones in + * We are returning all futures as list, not only succeeded ones in * order to fail composite future if any of them failed. * See Futures.allAsList for this description. */ @@ -399,14 +388,13 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { * @throws IllegalStateException * If currentState of task does not match expected state */ - private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { - Preconditions.checkState(currentPhase.equals(currentExpected), - "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(), - currentPhase, newState); - LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState); - currentPhase = newState; - }; + private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { + final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState); + Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s", + tx.getIdentifier(), currentExpected, currentPhase, newState); + LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState); + }; } }