package org.opendaylight.controller.md.sal.dom.broker.impl;
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.EnumMap;
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
- return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
+ return coordinator.submit(transaction, cohorts);
}
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
* {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type.
*
*/
-public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
- implements DOMTransactionChain, DOMDataCommitErrorListener {
+final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+ implements DOMTransactionChain {
+ private static enum State {
+ RUNNING,
+ CLOSING,
+ CLOSED,
+ FAILED,
+ }
+ private static final AtomicIntegerFieldUpdater<DOMDataBrokerTransactionChainImpl> COUNTER_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter");
+ private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
private final AtomicLong txNum = new AtomicLong();
private final DOMDataCommitExecutor coordinator;
private final TransactionChainListener listener;
private final long chainId;
- private volatile boolean failed = false;
+ private volatile State state = State.RUNNING;
+ private volatile int counter = 0;
/**
*
this.listener = Preconditions.checkNotNull(listener);
}
+ private void checkNotFailed() {
+ Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+ }
+
@Override
protected Object newTransactionIdentifier() {
return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
}
@Override
- public CheckedFuture<Void,TransactionCommitFailedException> submit(
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(
final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+ checkNotFailed();
checkNotClosed();
- return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+
+ COUNTER_UPDATER.incrementAndGet(this);
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ transactionCompleted();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ transactionFailed(transaction, t);
+ }
+ });
+
+ return ret;
}
@Override
public void close() {
- super.close();
+ final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+ if (!success) {
+ LOG.debug("Chain {} is no longer running", this);
+ return;
+ }
+ super.close();
for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
subChain.close();
}
- if (!failed) {
- LOG.debug("Transaction chain {} successfully finished.", this);
- // FIXME: this event should be emitted once all operations complete
- listener.onTransactionChainSuccessful(this);
+ if (counter == 0) {
+ finishClose();
}
}
- @Override
- public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
- failed = true;
+ private void finishClose() {
+ state = State.CLOSED;
+ listener.onTransactionChainSuccessful(this);
+ }
+
+ private void transactionCompleted() {
+ if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+ finishClose();
+ }
+ }
+
+ private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+ state = State.FAILED;
LOG.debug("Transaction chain {} failed.", this, cause);
listener.onTransactionChainFailed(this, tx, cause);
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@Override
public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
- Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
ListenableFuture<Void> commitFuture = null;
try {
commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
- listener, commitStatsTracker));
+ commitStatsTracker));
} catch(RejectedExecutionException e) {
LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
executor, e);
"Could not submit the commit task - the commit queue capacity has been exceeded.", e));
}
- if (listener.isPresent()) {
- Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
- }
-
return MappingCheckedFuture.create(commitFuture,
TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final Optional<DOMDataCommitErrorListener> listener,
final DurationStatsTracker commitStatTracker) {
this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-
-/**
- *
- * Utility implemetation of {@link FutureCallback} which is responsible
- * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
- *
- * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
- * callback is invoked with associated transaction and throwable is invoked on listener.
- *
- */
-class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
-
- private final DOMDataWriteTransaction tx;
- private final DOMDataCommitErrorListener listener;
-
-
- /**
- *
- * Construct new DOMDataCommitErrorInvoker.
- *
- * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
- * @param listener Listener which should be invoked on error.
- */
- public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
- this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
- this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
- }
-
- @Override
- public void onFailure(Throwable t) {
- listener.onCommitFailed(tx, t);
- }
-
- @Override
- public void onSuccess(Void result) {
- // NOOP
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import java.util.EventListener;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-
-/**
- *
- * Listener on transaction failure which may be passed to
- * {@link DOMDataCommitExecutor}. This listener is notified during transaction
- * processing, before result is delivered to other client code outside MD-SAL.
- * This allows implementors to update their internal state before transaction
- * failure is visible to client code.
- *
- * This is internal API for MD-SAL implementations, for consumer facing error
- * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}.
- *
- */
-interface DOMDataCommitErrorListener extends EventListener {
-
- /**
- *
- * Callback which is invoked on transaction failure during three phase
- * commit in {@link DOMDataCommitExecutor}.
- *
- *
- * Implementation of this callback MUST NOT do any blocking calls or any
- * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
- * Broker coordination thread.
- *
- * @param tx
- * Transaction which failed
- * @param cause
- * Failure reason
- */
- void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
-
-}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
+import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
/**
* Executor of Three Phase Commit coordination for
* Transaction to be used as context for reporting
* @param cohort
* DOM Store cohorts representing provided transaction, its
- * subtransactoins.
- * @param listener
- * Error listener which should be notified if transaction failed.
+ * subtransactions.
* @return a CheckedFuture. if commit coordination on cohorts finished successfully,
* nothing is returned from the Future, On failure,
* the Future fails with a {@link TransactionCommitFailedException}.
*
*/
CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
- Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+ Iterable<DOMStoreThreePhaseCommitCohort> cohort);
}