From 976cf80f542309baf9d5e0ec82f4f6991f309f47 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 18 Sep 2014 19:35:52 +0200 Subject: [PATCH] BUG-1845: implement proper shutdown sequence The notification chain should only invoke the successful callback once all transactions have been committed successfully. This patch does precisely that by tracking the number of outstanding transactions. Since this requires notification about successful transactions, too, we get rid of the unneeded DOMDataCommitErrorInvoker/Listener abstraction. Change-Id: I38534a3fb79a8a461059504de7b0cdd48348afef Signed-off-by: Robert Varga --- .../dom/broker/impl/DOMDataBrokerImpl.java | 3 +- .../DOMDataBrokerTransactionChainImpl.java | 75 +++++++++++++++---- .../impl/DOMDataCommitCoordinatorImpl.java | 11 +-- .../impl/DOMDataCommitErrorInvoker.java | 49 ------------ .../impl/DOMDataCommitErrorListener.java | 45 ----------- .../broker/impl/DOMDataCommitExecutor.java | 9 +-- 6 files changed, 67 insertions(+), 125 deletions(-) delete mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java delete mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java index 8ed5206132..5fbf1270cc 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListeningExecutorService; import java.util.EnumMap; @@ -102,6 +101,6 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory submit(final DOMDataWriteTransaction transaction, final Iterable cohorts) { LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts); - return coordinator.submit(transaction, cohorts, Optional. absent()); + return coordinator.submit(transaction, cohorts); } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java index 7cd6afa466..0b1dd1c5e0 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java @@ -6,11 +6,14 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -27,16 +30,27 @@ import org.slf4j.LoggerFactory; * {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type. * */ -public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory - implements DOMTransactionChain, DOMDataCommitErrorListener { +final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory + implements DOMTransactionChain { + private static enum State { + RUNNING, + CLOSING, + CLOSED, + FAILED, + } + private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state"); private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class); private final AtomicLong txNum = new AtomicLong(); private final DOMDataCommitExecutor coordinator; private final TransactionChainListener listener; private final long chainId; - private volatile boolean failed = false; + private volatile State state = State.RUNNING; + private volatile int counter = 0; /** * @@ -62,37 +76,70 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans this.listener = Preconditions.checkNotNull(listener); } + private void checkNotFailed() { + Preconditions.checkState(state != State.FAILED, "Transaction chain has failed"); + } + @Override protected Object newTransactionIdentifier() { return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement(); } @Override - public CheckedFuture submit( + public CheckedFuture submit( final DOMDataWriteTransaction transaction, final Iterable cohorts) { + checkNotFailed(); checkNotClosed(); - return coordinator.submit(transaction, cohorts, Optional. of(this)); + final CheckedFuture ret = coordinator.submit(transaction, cohorts); + + COUNTER_UPDATER.incrementAndGet(this); + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionCompleted(); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(transaction, t); + } + }); + + return ret; } @Override public void close() { - super.close(); + final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING); + if (!success) { + LOG.debug("Chain {} is no longer running", this); + return; + } + super.close(); for (DOMStoreTransactionChain subChain : getTxFactories().values()) { subChain.close(); } - if (!failed) { - LOG.debug("Transaction chain {} successfully finished.", this); - // FIXME: this event should be emitted once all operations complete - listener.onTransactionChainSuccessful(this); + if (counter == 0) { + finishClose(); } } - @Override - public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) { - failed = true; + private void finishClose() { + state = State.CLOSED; + listener.onTransactionChainSuccessful(this); + } + + private void transactionCompleted() { + if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) { + finishClose(); + } + } + + private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) { + state = State.FAILED; LOG.debug("Transaction chain {} failed.", this, cause); listener.onTransactionChainFailed(this, tx, cause); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 77cf105ed6..15d7b1d966 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -6,7 +6,6 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -63,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { @Override public CheckedFuture submit(final DOMDataWriteTransaction transaction, - final Iterable cohorts, final Optional listener) { + final Iterable cohorts) { Preconditions.checkArgument(transaction != null, "Transaction must not be null."); Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); - Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); ListenableFuture commitFuture = null; try { commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, - listener, commitStatsTracker)); + commitStatsTracker)); } catch(RejectedExecutionException e) { LOG.error("The commit executor's queue is full - submit task was rejected. \n" + executor, e); @@ -81,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); } - if (listener.isPresent()) { - Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); - } - return MappingCheckedFuture.create(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } @@ -141,7 +135,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { public CommitCoordinationTask(final DOMDataWriteTransaction transaction, final Iterable cohorts, - final Optional listener, final DurationStatsTracker commitStatTracker) { this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java deleted file mode 100644 index 5ce9241dd2..0000000000 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.md.sal.dom.broker.impl; - -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; - -/** - * - * Utility implemetation of {@link FutureCallback} which is responsible - * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed. - * - * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener} - * callback is invoked with associated transaction and throwable is invoked on listener. - * - */ -class DOMDataCommitErrorInvoker implements FutureCallback { - - private final DOMDataWriteTransaction tx; - private final DOMDataCommitErrorListener listener; - - - /** - * - * Construct new DOMDataCommitErrorInvoker. - * - * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)} - * @param listener Listener which should be invoked on error. - */ - public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) { - this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null"); - this.listener = Preconditions.checkNotNull(listener, "Listener must not be null"); - } - - @Override - public void onFailure(Throwable t) { - listener.onCommitFailed(tx, t); - } - - @Override - public void onSuccess(Void result) { - // NOOP - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java deleted file mode 100644 index 80bc6696f0..0000000000 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.md.sal.dom.broker.impl; - -import java.util.EventListener; - -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; - -/** - * - * Listener on transaction failure which may be passed to - * {@link DOMDataCommitExecutor}. This listener is notified during transaction - * processing, before result is delivered to other client code outside MD-SAL. - * This allows implementors to update their internal state before transaction - * failure is visible to client code. - * - * This is internal API for MD-SAL implementations, for consumer facing error - * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}. - * - */ -interface DOMDataCommitErrorListener extends EventListener { - - /** - * - * Callback which is invoked on transaction failure during three phase - * commit in {@link DOMDataCommitExecutor}. - * - * - * Implementation of this callback MUST NOT do any blocking calls or any - * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL - * Broker coordination thread. - * - * @param tx - * Transaction which failed - * @param cause - * Failure reason - */ - void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause); - -} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java index 234758ca75..8aa97e72d1 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java @@ -7,11 +7,10 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; +import com.google.common.util.concurrent.CheckedFuture; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; /** * Executor of Three Phase Commit coordination for @@ -35,15 +34,13 @@ interface DOMDataCommitExecutor { * Transaction to be used as context for reporting * @param cohort * DOM Store cohorts representing provided transaction, its - * subtransactoins. - * @param listener - * Error listener which should be notified if transaction failed. + * subtransactions. * @return a CheckedFuture. if commit coordination on cohorts finished successfully, * nothing is returned from the Future, On failure, * the Future fails with a {@link TransactionCommitFailedException}. * */ CheckedFuture submit(DOMDataWriteTransaction tx, - Iterable cohort, Optional listener); + Iterable cohort); } -- 2.36.6