From: Tony Tkacik Date: Mon, 26 May 2014 10:04:45 +0000 (+0200) Subject: Bug 1073: Added support to DOMBrokerImpl for Transaction Chaining X-Git-Tag: release/helium~657^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=2cf314976a3f81c7115f94c44fb37e967f8a4426 Bug 1073: Added support to DOMBrokerImpl for Transaction Chaining Splitted functionality of DOMBrokerImpl to separate classes which allows easier code reuse with transaction chaining: New internal APIs: DOMDataCommitExecutor - Commit executor which invokes three-phase commit coordination DOMDataCommitErrorListener - Error listener for commit executor and one execution AbstractDOMForwardedTransactionFactory - Factory which creates composite transactions on top of DOMStore transactions DOMDataCommitCoordinatorImpl - Commit Executor implementation DOMBrokerTransactionChainImpl - Implementation of DOMTransactionChain. Added 2 JUnit tests for Transaction Chains: - Test positive scenario (chain of write, read, delete transactions) - Test IllegalStateException when previous transaction was not commited. Change-Id: Ic2290d7fb3d4ea52a44bea02b493c1e537e929a6 Signed-off-by: Tony Tkacik --- diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataBroker.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataBroker.java index dbaba294aa..c120508f87 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataBroker.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataBroker.java @@ -1,6 +1,5 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html @@ -8,17 +7,45 @@ package org.opendaylight.controller.md.sal.dom.api; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.sal.core.api.BrokerService; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public interface DOMDataBroker extends AsyncDataBroker, DOMDataChangeListener>, BrokerService { +/** + * Data Broker which provides data transaction and data change listener fuctionality + * using {@link NormalizedNode} data format. + * + * This interface is type capture of generic interfaces and returns type captures + * of results for client-code convenience. + * + */ +public interface DOMDataBroker extends + AsyncDataBroker, DOMDataChangeListener>, + TransactionChainFactory>, BrokerService { + + /** + * {@inheritDoc} + */ @Override DOMDataReadTransaction newReadOnlyTransaction(); + /** + * {@inheritDoc} + */ @Override DOMDataReadWriteTransaction newReadWriteTransaction(); + /** + * {@inheritDoc} + */ @Override DOMDataWriteTransaction newWriteOnlyTransaction(); + + /** + * {@inheritDoc} + */ + @Override + DOMTransactionChain createTransactionChain(TransactionChainListener listener); } diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java new file mode 100644 index 0000000000..b894911ffa --- /dev/null +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMTransactionChain.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.api; + +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * A chain of DOM Data transactions. + * + * Transactions in a chain need to be committed in sequence and each + * transaction should see the effects of previous transactions as if they happened. A chain + * makes no guarantees of atomicity, in fact transactions are committed as soon as possible. + * + *

+ * This interface is type capture of {@link TransactionChain} for DOM Data Contracts. + */ +public interface DOMTransactionChain extends TransactionChain> { + + @Override + DOMDataReadTransaction newReadOnlyTransaction(); + + @Override + DOMDataReadWriteTransaction newReadWriteTransaction(); + + @Override + DOMDataWriteTransaction newWriteOnlyTransaction(); + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java new file mode 100644 index 0000000000..0c07b0684c --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedCompositeTransaction.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** + * Composite DOM Transaction backed by {@link DOMStoreTransaction}. + * + * Abstract base for composite transaction, which provides access only to common + * functionality as retrieval of subtransaction, close method and retrieval of + * identifier. + * + * @param + * Subtransaction distinguisher + * @param + * Subtransaction type + */ +abstract class AbstractDOMForwardedCompositeTransaction implements + AsyncTransaction> { + + private final ImmutableMap backingTxs; + private final Object identifier; + + /** + * + * Creates new composite Transactions. + * + * @param identifier + * Identifier of transaction. + * @param backingTxs + * Key,value map of backing transactions. + */ + protected AbstractDOMForwardedCompositeTransaction(final Object identifier, final ImmutableMap backingTxs) { + this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null"); + this.backingTxs = Preconditions.checkNotNull(backingTxs, "Backing transactions should not be null"); + } + + /** + * Returns subtransaction associated with supplied key. + * + * @param key + * @return + * @throws NullPointerException + * if key is null + * @throws IllegalArgumentException + * if no subtransaction is associated with key. + */ + protected final T getSubtransaction(final K key) { + Preconditions.checkNotNull(key, "key must not be null."); + Preconditions.checkArgument(backingTxs.containsKey(key), "No subtransaction associated with %s", key); + return backingTxs.get(key); + } + + /** + * Returns immutable Iterable of all subtransactions. + * + */ + protected Iterable getSubtransactions() { + return backingTxs.values(); + } + + @Override + public Object getIdentifier() { + return identifier; + } + + @Override + public void close() { + /* + * We share one exception for all failures, which are added + * as supressedExceptions to it. + * + */ + IllegalStateException failure = null; + for (T subtransaction : backingTxs.values()) { + try { + subtransaction.close(); + } catch (Exception e) { + // If we did not allocated failure we allocate it + if(failure == null) { + failure = new IllegalStateException("Uncaught exception occured during closing transaction.", e); + } else { + // We update it with addotional exceptions, which occured during error. + failure.addSuppressed(e); + } + } + } + // If we have failure, we throw it at after all attempts to close. + if(failure != null) { + throw failure; + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java new file mode 100644 index 0000000000..7b5ea11dbb --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/AbstractDOMForwardedTransactionFactory.java @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.Map; +import java.util.Map.Entry; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** + * + * Abstract composite transaction factory. + * + * Provides an convenience common implementation for composite DOM Transactions, + * where subtransaction is identified by {@link LogicalDatastoreType} type and + * implementation of subtransaction is provided by + * {@link DOMStoreTransactionFactory}. + * + * Note:This class does not have thread-safe implementation of {@link #close()}, + * implementation may allow accessing and allocating new transactions during closing + * this instance. + * + * @param + * Type of {@link DOMStoreTransactionFactory} factory. + */ +public abstract class AbstractDOMForwardedTransactionFactory implements DOMDataCommitImplementation, AutoCloseable { + + private final ImmutableMap storeTxFactories; + + private boolean closed; + + protected AbstractDOMForwardedTransactionFactory(final Map txFactories) { + this.storeTxFactories = ImmutableMap.copyOf(txFactories); + } + + /** + * Implementations must return unique identifier for each and every call of + * this method; + * + * @return new Unique transaction identifier. + */ + protected abstract Object newTransactionIdentifier(); + + /** + * Creates a new composite read-only transaction + * + * Creates a new composite read-only transaction backed by one transaction + * per factory in {@link #getTxFactories()}. + * + * Subtransaction for reading is selected by supplied + * {@link LogicalDatastoreType} as parameter for + * {@link DOMDataReadTransaction#read(LogicalDatastoreType, InstanceIdentifier)} + * . + * + * Id of returned transaction is retrieved via + * {@link #newTransactionIdentifier()}. + * + * @return New composite read-only transaction. + */ + public DOMDataReadTransaction newReadOnlyTransaction() { + checkNotClosed(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Entry store : storeTxFactories.entrySet()) { + builder.put(store.getKey(), store.getValue().newReadOnlyTransaction()); + } + return new DOMForwardedReadOnlyTransaction(newTransactionIdentifier(), builder.build()); + } + + + + /** + * Creates a new composite write-only transaction + * + *

+ * Creates a new composite write-only transaction backed by one write-only + * transaction per factory in {@link #getTxFactories()}. + * + *

+ * Implementation of composite Write-only transaction is following: + * + *

    + *
  • + * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)} + * is invoked on selected subtransaction. + *
  • + * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)} + * is invoked on selected subtransaction. + *
  • + * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier) + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on + * selected subtransaction. + *
  • {@link DOMDataWriteTransaction#commit()} - results in invoking + * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts + * and then invoking finalized implementation callback + * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which + * was commited and gathered results. + *
+ * + * Id of returned transaction is generated via + * {@link #newTransactionIdentifier()}. + * + * @return New composite write-only transaction associated with this + * factory. + */ + public DOMDataWriteTransaction newWriteOnlyTransaction() { + checkNotClosed(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Entry store : storeTxFactories.entrySet()) { + builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction()); + } + return new DOMForwardedWriteTransaction(newTransactionIdentifier(), builder.build(), + this); + } + + /** + * Creates a new composite write-only transaction + * + *

+ * Creates a new composite write-only transaction backed by one write-only + * transaction per factory in {@link #getTxFactories()}. + *

+ * Implementation of composite Write-only transaction is following: + * + *

    + *
  • + * {@link DOMDataWriteTransaction#read(LogicalDatastoreType, InstanceIdentifier)} + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#read(InstanceIdentifier)} is invoked on + * selected subtransaction. + *
  • + * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)} + * is invoked on selected subtransaction. + *
  • + * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)} + * is invoked on selected subtransaction. + *
  • + * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier) + * - backing subtransaction is selected by {@link LogicalDatastoreType}, + * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on + * selected subtransaction. + *
  • {@link DOMDataWriteTransaction#commit()} - results in invoking + * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts + * and then invoking finalized implementation callback + * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which + * was commited and gathered results. + *
  • + *
+ * + * Id of returned transaction is generated via + * {@link #newTransactionIdentifier()}. + * + * @return New composite read-write transaction associated with this + * factory. + * + */ + public DOMDataReadWriteTransaction newReadWriteTransaction() { + checkNotClosed(); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Entry store : storeTxFactories.entrySet()) { + builder.put(store.getKey(), store.getValue().newReadWriteTransaction()); + } + return new DOMForwardedReadWriteTransaction(newTransactionIdentifier(), builder.build(), this); + } + + /** + * Convenience accessor of backing factories intended to be used only by + * finalization of this class. + * + * Note: + * Finalization of this class may want to access other functionality of + * supplied Transaction factories. + * + * @return Map of backing transaction factories. + */ + protected final Map getTxFactories() { + return storeTxFactories; + } + + /** + * + * Checks if instance is not closed. + * + * @throws IllegalStateException If instance of this class was closed. + * + */ + @GuardedBy("this") + protected synchronized void checkNotClosed() { + Preconditions.checkState(!closed,"Transaction factory was closed. No further operations allowed."); + } + + @Override + @GuardedBy("this") + public synchronized void close() { + closed = true; + } + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java index 608ac9bc68..7e37a1e3a3 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java @@ -7,340 +7,81 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import java.util.Collections; -import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable { +public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory implements DOMDataBroker, + AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class); - private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class); - private final ImmutableMap datastores; - private final ListeningExecutorService executor; + + private final DOMDataCommitCoordinatorImpl coordinator; private final AtomicLong txNum = new AtomicLong(); + private final AtomicLong chainNum = new AtomicLong(); public DOMDataBrokerImpl(final ImmutableMap datastores, final ListeningExecutorService executor) { - super(); - this.datastores = datastores; - this.executor = executor; + super(datastores); + this.coordinator = new DOMDataCommitCoordinatorImpl(executor); } - private static final Function, Boolean> AND_FUNCTION = new Function, Boolean>() { - - @Override - public Boolean apply(final Iterable input) { - - for (Boolean value : input) { - if (value == false) { - return Boolean.FALSE; - } - } - return Boolean.TRUE; - } - }; - @Override - public DOMDataReadTransaction newReadOnlyTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newReadOnlyTransaction()); - } - return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build()); - } - - private Object newTransactionIdentifier() { + protected Object newTransactionIdentifier() { return "DOM-" + txNum.getAndIncrement(); } - @Override - public DOMDataReadWriteTransaction newReadWriteTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newReadWriteTransaction()); - } - return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this); - } - - @Override - public DOMDataWriteTransaction newWriteOnlyTransaction() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Entry store : datastores.entrySet()) { - builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction()); - } - return new WriteTransactionImpl(newTransactionIdentifier(), builder.build(), this); - } - @Override public ListenerRegistration registerDataChangeListener(final LogicalDatastoreType store, final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) { - DOMStore potentialStore = datastores.get(store); + DOMStore potentialStore = getTxFactories().get(store); checkState(potentialStore != null, "Requested logical data store is not available."); return potentialStore.registerChangeListener(path, listener, triggeringScope); } - private ListenableFuture> submit( - final WriteTransactionImpl transaction) { - LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - return executor.submit(new CommitCoordination(transaction)); - } - - private abstract static class AbstractCompositeTransaction implements - AsyncTransaction> { - - private final ImmutableMap backingTxs; - private final Object identifier; - - protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap backingTxs) { - this.identifier = checkNotNull(identifier, "Identifier should not be null"); - this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null"); - } - - protected T getSubtransaction(final K key) { - return backingTxs.get(key); - } - - public Iterable getSubtransactions() { - return backingTxs.values(); - } - - @Override - public Object getIdentifier() { - return identifier; - } - - @Override - public void close() { - try { - for (T subtransaction : backingTxs.values()) { - subtransaction.close(); - } - } catch (Exception e) { - throw new IllegalStateException("Uncaught exception occured during closing transaction.", e); - } - } - - } - - private static class ReadOnlyTransactionImpl extends - AbstractCompositeTransaction implements - DOMDataReadTransaction { - - protected ReadOnlyTransactionImpl(final Object identifier, - final ImmutableMap backingTxs) { - super(identifier, backingTxs); - } - - @Override - public ListenableFuture>> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { - return getSubtransaction(store).read(path); - } - - } - - private static class WriteTransactionImpl extends - AbstractCompositeTransaction implements DOMDataWriteTransaction { - - private final DOMDataBrokerImpl broker; - private ImmutableList cohorts; - - protected WriteTransactionImpl(final Object identifier, final ImmutableMap backingTxs, - final DOMDataBrokerImpl broker) { - super(identifier, backingTxs); - this.broker = broker; - } - - public synchronized Iterable ready() { - checkState(cohorts == null, "Transaction was already marked as ready."); - ImmutableList.Builder cohortsBuilder = ImmutableList.builder(); - for (DOMStoreWriteTransaction subTx : getSubtransactions()) { - cohortsBuilder.add(subTx.ready()); - } - cohorts = cohortsBuilder.build(); - return cohorts; - } - - protected ImmutableList getCohorts() { - return cohorts; - } - - @Override - public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode data) { - getSubtransaction(store).write(path, data); - } - - @Override - public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) { - getSubtransaction(store).delete(path); - } - - @Override - public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, - final NormalizedNode data) { - getSubtransaction(store).merge(path,data); - } - - @Override - public void cancel() { - // TODO Auto-generated method stub - - } - - @Override - public ListenableFuture> commit() { - - ready(); - return broker.submit(this); - } - - } - - private static class ReadWriteTransactionImpl extends WriteTransactionImpl implements - DOMDataReadWriteTransaction { - - protected ReadWriteTransactionImpl(final Object identifier, - final ImmutableMap backingTxs, - final DOMDataBrokerImpl broker) { - // super(identifier, backingTxs); - super(identifier, backingTxs, broker); - } - - @Override - public ListenableFuture>> read(final LogicalDatastoreType store, - final InstanceIdentifier path) { - return getSubtransaction(store).read(path); - } - } - - private final class CommitCoordination implements Callable> { - - private final WriteTransactionImpl transaction; - - public CommitCoordination(final WriteTransactionImpl transaction) { - this.transaction = transaction; - } - - @Override - public RpcResult call() throws Exception { - - try { - Boolean canCommit = canCommit().get(); - - if (canCommit) { - try { - preCommit().get(); - try { - commit().get(); - COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier()); - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, - Collections. emptySet()); - - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e); - } - - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort", - transaction.getIdentifier(), e); - } - } else { - COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier()); - abort().get(); - } - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e); - - } - try { - abort().get(); - } catch (InterruptedException | ExecutionException e) { - COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e); - } - return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections. emptySet()); - } - - public ListenableFuture preCommit() { - COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.preCommit()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - } - - public ListenableFuture commit() { - COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.commit()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - } - - public ListenableFuture canCommit() { - COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier()); - Builder> canCommitOperations = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - canCommitOperations.add(cohort.canCommit()); - } - ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); - return Futures.transform(allCanCommits, AND_FUNCTION); - } - - public ListenableFuture abort() { - COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier()); - Builder> ops = ImmutableList.builder(); - for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) { - ops.add(cohort.abort()); - } - return (ListenableFuture) Futures.allAsList(ops.build()); - }; + @Override + public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) { + ImmutableMap.Builder backingChainsBuilder = ImmutableMap + .builder(); + for (Entry entry : getTxFactories().entrySet()) { + backingChainsBuilder.put(entry.getKey(), entry.getValue().createTransactionChain()); + } + long chainId = chainNum.getAndIncrement(); + ImmutableMap backingChains = backingChainsBuilder.build(); + LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener, + backingChains); + return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, coordinator, listener); } @Override - public void close() throws Exception { - + public ListenableFuture> commit(final DOMDataWriteTransaction transaction, + final Iterable cohorts) { + LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts); + return coordinator.submit(transaction, cohorts, Optional. absent()); } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java new file mode 100644 index 0000000000..bcefc25ae8 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * NormalizedNode implementation of {@link TransactionChain} which is backed + * by several {@link DOMStoreTransactionChain} differentiated by provided + * {@link LogicalDatastoreType} type. + * + */ +public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory + implements DOMTransactionChain, DOMDataCommitErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class); + private final DOMDataCommitExecutor coordinator; + private final TransactionChainListener listener; + private final long chainId; + private final AtomicLong txNum = new AtomicLong(); + @GuardedBy("this") + private boolean failed = false; + + /** + * + * @param chainId + * ID of transaction chain + * @param chains + * Backing {@link DOMStoreTransactionChain}s. + * @param coordinator + * Commit Coordinator which should be used to coordinate commits + * of transaction + * produced by this chain. + * @param listener + * Listener, which listens on transaction chain events. + * @throws NullPointerException + * If any of arguments is null. + */ + public DOMDataBrokerTransactionChainImpl(final long chainId, + final ImmutableMap chains, + final DOMDataCommitExecutor coordinator, final TransactionChainListener listener) { + super(chains); + this.chainId = chainId; + this.coordinator = Preconditions.checkNotNull(coordinator); + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + protected Object newTransactionIdentifier() { + return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement(); + } + + @Override + public synchronized ListenableFuture> commit( + final DOMDataWriteTransaction transaction, final Iterable cohorts) { + return coordinator.submit(transaction, cohorts, Optional. of(this)); + } + + @Override + public synchronized void close() { + super.close(); + for (DOMStoreTransactionChain subChain : getTxFactories().values()) { + subChain.close(); + } + + if (!failed) { + LOG.debug("Transaction chain {} successfully finished.", this); + listener.onTransactionChainSuccessful(this); + } + } + + @Override + public synchronized void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) { + failed = true; + LOG.debug("Transaction chain {} failed.", this, cause); + listener.onTransactionChainFailed(this, tx, cause); + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java new file mode 100644 index 0000000000..540e2fe20c --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -0,0 +1,409 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +/** + * + * Implementation of blocking three phase commit coordinator, which which + * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}. + * + * This implementation does not support cancelation of commit, + * + * In order to advance to next phase of three phase commit all subtasks of + * previous step must be finish. + * + * This executor does not have an upper bound on subtask timeout. + * + * + */ +public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class); + + /** + * Runs AND binary operation between all booleans in supplied iteration of booleans. + * + * This method will stop evaluating iterables if first found is false. + */ + private static final Function, Boolean> AND_FUNCTION = new Function, Boolean>() { + + @Override + public Boolean apply(final Iterable input) { + for(boolean value : input) { + if(!value) { + return Boolean.FALSE; + } + } + return Boolean.TRUE; + } + }; + + private final ListeningExecutorService executor; + + /** + * + * Construct DOMDataCommitCoordinator which uses supplied executor to + * process commit coordinations. + * + * @param executor + */ + public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) { + this.executor = Preconditions.checkNotNull(executor, "executor must not be null."); + } + + @Override + public ListenableFuture> submit(final DOMDataWriteTransaction transaction, + final Iterable cohorts, final Optional listener) { + Preconditions.checkArgument(transaction != null, "Transaction must not be null."); + Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); + Preconditions.checkArgument(listener != null, "Listener must not be null"); + LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); + ListenableFuture> commitFuture = executor.submit(new CommitCoordinationTask( + transaction, cohorts, listener)); + if (listener.isPresent()) { + Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); + } + return commitFuture; + } + + /** + * + * Phase of 3PC commit + * + * Represents phase of 3PC Commit + * + * + */ + private static enum CommitPhase { + /** + * + * Commit Coordination Task is submitted for executing + * + */ + SUBMITTED, + /** + * Commit Coordination Task is in can commit phase of 3PC + * + */ + CAN_COMMIT, + /** + * Commit Coordination Task is in pre-commit phase of 3PC + * + */ + PRE_COMMIT, + /** + * Commit Coordination Task is in commit phase of 3PC + * + */ + COMMIT, + /** + * Commit Coordination Task is in abort phase of 3PC + * + */ + ABORT + } + + /** + * + * Implementation of blocking three-phase commit-coordination tasks without + * support of cancelation. + * + */ + private static class CommitCoordinationTask implements Callable> { + + private final DOMDataWriteTransaction tx; + private final Iterable cohorts; + + @GuardedBy("this") + private CommitPhase currentPhase; + + public CommitCoordinationTask(final DOMDataWriteTransaction transaction, + final Iterable cohorts, + final Optional listener) { + this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null"); + this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null"); + this.currentPhase = CommitPhase.SUBMITTED; + } + + @Override + public RpcResult call() throws TransactionCommitFailedException { + + try { + canCommitBlocking(); + preCommitBlocking(); + return commitBlocking(); + } catch (TransactionCommitFailedException e) { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e); + abortBlocking(e); + throw e; + } + } + + /** + * + * Invokes canCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed can Commit + * + */ + private void canCommitBlocking() throws TransactionCommitFailedException { + final Boolean canCommitResult = canCommitAll().checkedGet(); + if (!canCommitResult) { + throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available."); + } + } + + /** + * + * Invokes preCommit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private void preCommitBlocking() throws TransactionCommitFailedException { + preCommitAll().checkedGet(); + } + + /** + * + * Invokes commit on underlying cohorts and blocks till + * all results are returned. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException. + * + * @throws TransactionCommitFailedException + * If one of cohorts failed preCommit + * + */ + private RpcResult commitBlocking() throws TransactionCommitFailedException { + commitAll().checkedGet(); + return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections. emptySet()); + } + + /** + * Aborts transaction. + * + * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all + * cohorts, blocks + * for all results. If any of the abort failed throws + * IllegalStateException, + * which will contains originalCause as suppressed Exception. + * + * If aborts we're successful throws supplied exception + * + * @param originalCause + * Exception which should be used to fail transaction for + * consumers of transaction + * future and listeners of transaction failure. + * @throws TransactionCommitFailedException + * on invocation of this method. + * originalCa + * @throws IllegalStateException + * if abort failed. + */ + private void abortBlocking(final TransactionCommitFailedException originalCause) + throws TransactionCommitFailedException { + LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause); + Exception cause = originalCause; + try { + abortAsyncAll().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e); + cause = new IllegalStateException("Abort failed.", e); + cause.addSuppressed(e); + } + Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class); + } + + /** + * + * Invokes preCommit on underlying cohorts and returns future + * which will complete once all preCommit on cohorts completed or + * failed. + * + * + * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current + * state is not CAN_COMMIT + * throws IllegalStateException. + * + * @return Future which will complete once all cohorts completed + * preCommit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture preCommitAll() { + changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.preCommit()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER); + } + + /** + * + * Invokes commit on underlying cohorts and returns future which + * completes + * once all commits on cohorts are completed. + * + * Valid state transition is from PRE_COMMIT to COMMIT, if not throws + * IllegalStateException + * + * @return Future which will complete once all cohorts completed + * commit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture commitAll() { + changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.commit()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); + } + + /** + * + * Invokes canCommit on underlying cohorts and returns composite future + * which will contains {@link Boolean#TRUE} only and only if + * all cohorts returned true. + * + * Valid state transition is from SUBMITTED to CAN_COMMIT, + * if currentPhase is not SUBMITTED throws IllegalStateException. + * + * @return Future which will complete once all cohorts completed + * preCommit. + * Future throws TransactionCommitFailedException + * If any of cohorts failed preCommit + * + */ + private CheckedFuture canCommitAll() { + changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT); + Builder> canCommitOperations = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + canCommitOperations.add(cohort.canCommit()); + } + ListenableFuture> allCanCommits = Futures.allAsList(canCommitOperations.build()); + ListenableFuture allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION); + return Futures + .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER); + + } + + /** + * + * Invokes abort on underlying cohorts and returns future which + * completes + * once all abort on cohorts are completed. + * + * @return Future which will complete once all cohorts completed + * abort. + * + */ + private ListenableFuture abortAsyncAll() { + changeStateFrom(currentPhase, CommitPhase.ABORT); + Builder> ops = ImmutableList.builder(); + for (DOMStoreThreePhaseCommitCohort cohort : cohorts) { + ops.add(cohort.abort()); + } + /* + * We are returing all futures as list, not only succeeded ones in + * order to fail composite future if any of them failed. + * See Futures.allAsList for this description. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + ListenableFuture compositeResult = (ListenableFuture) Futures.allAsList(ops.build()); + return compositeResult; + } + + /** + * Change phase / state of transaction from expected value to new value + * + * This method checks state and updates state to new state of + * of this task if current state equals expected state. + * If expected state and current state are different raises + * IllegalStateException + * which means there is probably bug in implementation of commit + * coordination. + * + * If transition is successful, it logs transition on DEBUG level. + * + * @param currentExpected + * Required phase for change of state + * @param newState + * New Phase which will be entered by transaction. + * @throws IllegalStateException + * If currentState of task does not match expected state + */ + private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) { + Preconditions.checkState(currentPhase.equals(currentExpected), + "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(), + currentPhase, newState); + LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState); + currentPhase = newState; + }; + + } + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java new file mode 100644 index 0000000000..811d4d8839 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; + +/** + * + * Utility implemetation of {@link FutureCallback} which is responsible + * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed. + * + * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener} + * callback is invoked with associated transaction and throwable is invoked on listener. + * + */ +class DOMDataCommitErrorInvoker implements FutureCallback> { + + private final DOMDataWriteTransaction tx; + private final DOMDataCommitErrorListener listener; + + + /** + * + * Construct new DOMDataCommitErrorInvoker. + * + * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)} + * @param listener Listener which should be invoked on error. + */ + public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) { + this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null"); + this.listener = Preconditions.checkNotNull(listener, "Listener must not be null"); + } + + @Override + public void onFailure(Throwable t) { + listener.onCommitFailed(tx, t); + } + + @Override + public void onSuccess(RpcResult result) { + // NOOP + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java new file mode 100644 index 0000000000..3a4b54eb2d --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.EventListener; + +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; + +/** + * + * Listener on transaction failure which may be passed to + * {@link DOMDataCommitExecutor}. This listener is notified during transaction + * processing, before result is delivered to other client code outside MD-SAL. + * This allows implementors to update their internal state before transaction + * failure is visible to client code. + * + * This is internal API for MD-SAL implementations, for consumer facing error + * listeners see {@link TransactionChainListener}. + * + */ +interface DOMDataCommitErrorListener extends EventListener { + + /** + * + * Callback which is invoked on transaction failure during three phase + * commit in {@link DOMDataCommitExecutor}. + * + * + * Implementation of this callback MUST NOT do any blocking calls or any + * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL + * Broker coordination thread. + * + * @param tx + * Transaction which failed + * @param cause + * Failure reason + */ + void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause); + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java new file mode 100644 index 0000000000..f233912ea4 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Executor of Three Phase Commit coordination for + * {@link DOMDataWriteTransaction} transactions. + * + * Implementations are responsible for executing implementation of three-phase + * commit protocol on supplied {@link DOMStoreThreePhaseCommitCohort}s. + * + * + */ +interface DOMDataCommitExecutor { + + /** + * Submits supplied transaction to be executed in context of provided + * cohorts. + * + * Transaction is used only as a context, cohorts should be associated with + * this transaction. + * + * @param tx + * Transaction to be used as context for reporting + * @param cohort + * DOM Store cohorts representing provided transaction, its + * subtransactoins. + * @param listener + * Error listener which should be notified if transaction failed. + * @return ListenableFuture which contains RpcResult with + * {@link TransactionStatus#COMMITED} if commit coordination on + * cohorts finished successfully. + * + */ + ListenableFuture> submit(DOMDataWriteTransaction tx, + Iterable cohort, Optional listener); + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java new file mode 100644 index 0000000000..ca2d711032 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitImplementation.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * + * Implementation prototype of commit method for + * {@link DOMForwardedWriteTransaction}. + * + */ +public interface DOMDataCommitImplementation { + + /** + * User-supplied implementation of {@link DOMDataWriteTransaction#commit()} + * for transaction. + * + * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked + * on transaction created by this factory. + * + * @param transaction + * Transaction on which {@link DOMDataWriteTransaction#commit()} + * was invoked. + * @param cohorts + * Iteration of cohorts for subtransactions associated with + * commited transaction. + * + */ + ListenableFuture> commit(final DOMDataWriteTransaction transaction, + final Iterable cohorts); +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java new file mode 100644 index 0000000000..be55911199 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadOnlyTransaction.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * + * Read Only Transaction, which is composed of several + * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by + * {@link LogicalDatastoreType} type parameter in + * {@link #read(LogicalDatastoreType, InstanceIdentifier)}. + */ +class DOMForwardedReadOnlyTransaction extends + AbstractDOMForwardedCompositeTransaction implements + DOMDataReadTransaction { + + protected DOMForwardedReadOnlyTransaction(final Object identifier, + final ImmutableMap backingTxs) { + super(identifier, backingTxs); + } + + @Override + public ListenableFuture>> read(final LogicalDatastoreType store, + final InstanceIdentifier path) { + return getSubtransaction(store).read(path); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java new file mode 100644 index 0000000000..956e169333 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedReadWriteTransaction.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */package org.opendaylight.controller.md.sal.dom.broker.impl; + +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * + * Read-Write Transaction, which is composed of several + * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by + * {@link LogicalDatastoreType} type parameter in: + * + *
    + *
  • {@link #read(LogicalDatastoreType, InstanceIdentifier)} + *
  • {@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + *
  • {@link #delete(LogicalDatastoreType, InstanceIdentifier)} + *
  • {@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + *
+ * {@link #commit()} will result in invocation of + * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)} + * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying + * transactions. + * + */ +class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction implements + DOMDataReadWriteTransaction { + + protected DOMForwardedReadWriteTransaction(final Object identifier, + final ImmutableMap backingTxs, + final DOMDataCommitImplementation commitImpl) { + super(identifier, backingTxs, commitImpl); + } + + @Override + public ListenableFuture>> read(final LogicalDatastoreType store, + final InstanceIdentifier path) { + return getSubtransaction(store).read(path); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java new file mode 100644 index 0000000000..199438fc87 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMForwardedWriteTransaction.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * /** + * + * Read-Write Transaction, which is composed of several + * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by + * {@link LogicalDatastoreType} type parameter in: + * + *
    + *
  • {@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + *
  • {@link #delete(LogicalDatastoreType, InstanceIdentifier)} + *
  • {@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)} + *
+ *

+ * {@link #commit()} will result in invocation of + * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)} + * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying + * transactions. + * + * @param + * Subtype of {@link DOMStoreWriteTransaction} which is used as + * subtransaction. + */ +class DOMForwardedWriteTransaction extends + AbstractDOMForwardedCompositeTransaction implements DOMDataWriteTransaction { + + @GuardedBy("this") + private DOMDataCommitImplementation commitImpl; + + @GuardedBy("this") + private boolean canceled; + @GuardedBy("this") + private ListenableFuture> commitFuture; + + protected DOMForwardedWriteTransaction(final Object identifier, + final ImmutableMap backingTxs, final DOMDataCommitImplementation commitImpl) { + super(identifier, backingTxs); + this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null."); + } + + @Override + public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode data) { + checkNotReady(); + getSubtransaction(store).write(path, data); + } + + @Override + public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) { + checkNotReady(); + getSubtransaction(store).delete(path); + } + + @Override + public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode data) { + checkNotReady(); + getSubtransaction(store).merge(path, data); + } + + @Override + public synchronized void cancel() { + checkState(!canceled, "Transaction was canceled."); + if (commitFuture != null) { + // FIXME: Implement cancelation of commit future + // when Broker impl will support cancelation. + throw new UnsupportedOperationException("Not implemented yet."); + } + canceled = true; + commitImpl = null; + + } + + @Override + public synchronized ListenableFuture> commit() { + checkNotReady(); + + ImmutableList.Builder cohortsBuilder = ImmutableList.builder(); + for (DOMStoreWriteTransaction subTx : getSubtransactions()) { + cohortsBuilder.add(subTx.ready()); + } + ImmutableList cohorts = cohortsBuilder.build(); + commitFuture = commitImpl.commit(this, cohorts); + return commitFuture; + } + + private void checkNotReady() { + checkNotCanceled(); + checkNotCommited(); + } + + private void checkNotCanceled() { + Preconditions.checkState(!canceled, "Transaction was canceled."); + } + + private void checkNotCommited() { + checkState(commitFuture == null, "Transaction was already commited."); + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java new file mode 100644 index 0000000000..87bd6c8c60 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/TransactionCommitFailedExceptionMapper.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import java.util.concurrent.ExecutionException; + +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; + +/** + * + * Utility exception mapper which translates {@link Exception} + * to {@link TransactionCommitFailedException}. + * + * This mapper is intended to be used with {@link Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)} + *

    + *
  • if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception. + *
  • if exception is {@link ExecutionException} and cause is {@link TransactionCommitFailedException} return cause + *
  • otherwise returns {@link TransactionCommitFailedException} with original exception as a cause. + *
+ * + */ +final class TransactionCommitFailedExceptionMapper implements + Function { + + static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit"); + + static final TransactionCommitFailedExceptionMapper CAN_COMMIT_ERROR_MAPPER = create("preCommit"); + + static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit"); + + private final String opName; + + private TransactionCommitFailedExceptionMapper(final String opName) { + this.opName = Preconditions.checkNotNull(opName); + } + + public static final TransactionCommitFailedExceptionMapper create(final String opName) { + return new TransactionCommitFailedExceptionMapper(opName); + } + + @Override + public TransactionCommitFailedException apply(final Exception e) { + // If excetion is TransactionCommitFailedException + // we reuse it directly. + if (e instanceof TransactionCommitFailedException) { + return (TransactionCommitFailedException) e; + } + // If error is ExecutionException which was caused by cause of + // TransactionCommitFailedException + // we reuse original cause + if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) { + return (TransactionCommitFailedException) e.getCause(); + } + if (e instanceof InterruptedException) { + return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e); + } + // Otherwise we are using new exception, with original cause + return new TransactionCommitFailedException(opName + " failed", e); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/DOMDataBrokerProxy.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/DOMDataBrokerProxy.java index 70db71f3ac..b0ccfb995d 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/DOMDataBrokerProxy.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/DOMDataBrokerProxy.java @@ -1,11 +1,13 @@ package org.opendaylight.controller.sal.dom.broker.osgi; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.osgi.framework.ServiceReference; @@ -38,4 +40,9 @@ public class DOMDataBrokerProxy extends AbstractBrokerServiceProxy failFuture = SettableFuture.create(); + private final SettableFuture successFuture = SettableFuture.create(); + + @Override + public void onTransactionChainFailed(TransactionChain chain, AsyncTransaction transaction, + Throwable cause) { + failFuture.set(cause); + } + + @Override + public void onTransactionChainSuccessful(TransactionChain chain) { + successFuture.set(null); + } + + public SettableFuture getFailFuture() { + return failFuture; + } + + public SettableFuture getSuccessFuture() { + return successFuture; + } + +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java new file mode 100644 index 0000000000..b360cb1694 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.broker.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; +import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class DOMTransactionChainTest { + + private SchemaContext schemaContext; + private DOMDataBrokerImpl domBroker; + + @Before + public void setupStore() { + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + schemaContext = TestModel.createTestContext(); + + operStore.onGlobalContextUpdated(schemaContext); + configStore.onGlobalContextUpdated(schemaContext); + + ImmutableMap stores = ImmutableMap. builder() // + .put(CONFIGURATION, configStore) // + .put(OPERATIONAL, operStore) // + .build(); + + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + domBroker = new DOMDataBrokerImpl(stores, executor); + } + + @Test + public void testTransactionChainNoConflict() throws InterruptedException, ExecutionException, TimeoutException { + BlockingTransactionChainListener listener = new BlockingTransactionChainListener(); + DOMTransactionChain txChain = domBroker.createTransactionChain(listener); + assertNotNull(txChain); + + /** + * We alocate new read-write transaction and write /test + * + * + */ + DOMDataReadWriteTransaction firstTx = allocateAndWrite(txChain); + + /** + * First transaction is marked as ready, we are able to allocate chained + * transactions + */ + ListenableFuture> firstWriteTxFuture = firstTx.commit(); + + /** + * We alocate chained transaction - read transaction. + */ + DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction(); + + /** + * + * We test if we are able to read data from tx, read should not fail + * since we are using chained transaction. + * + * + */ + assertTestContainerExists(secondReadTx); + + /** + * + * We alocate next transaction, which is still based on first one, but + * is read-write. + * + */ + DOMDataReadWriteTransaction thirdDeleteTx = allocateAndDelete(txChain); + + /** + * third transaction is sealed. + */ + ListenableFuture> thirdDeleteTxFuture = thirdDeleteTx.commit(); + + /** + * We commit first transaction + * + */ + assertCommitSuccessful(firstWriteTxFuture); + + // Alocates store transaction + DOMDataReadTransaction storeReadTx = domBroker.newReadOnlyTransaction(); + /** + * We verify transaction is commited to store, container should exists + * in datastore. + */ + assertTestContainerExists(storeReadTx); + /** + * We commit third transaction + * + */ + assertCommitSuccessful(thirdDeleteTxFuture); + + /** + * We close transaction chain. + */ + txChain.close(); + + listener.getSuccessFuture().get(1000, TimeUnit.MILLISECONDS); + } + + @Test + public void testTransactionChainNotSealed() throws InterruptedException, ExecutionException, TimeoutException { + BlockingTransactionChainListener listener = new BlockingTransactionChainListener(); + DOMTransactionChain txChain = domBroker.createTransactionChain(listener); + assertNotNull(txChain); + + /** + * We alocate new read-write transaction and write /test + * + * + */ + allocateAndWrite(txChain); + + /** + * We alocate chained transaction - read transaction, note first one is + * still not commited to datastore, so this allocation should fail with + * IllegalStateException. + */ + try { + DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction(); + fail("Allocation of secondReadTx should fail with IllegalStateException"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } + + private static DOMDataReadWriteTransaction allocateAndDelete(DOMTransactionChain txChain) + throws InterruptedException, ExecutionException { + DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction(); + + /** + * We test existence of /test in third transaction container should + * still be visible from first one (which is still uncommmited). + * + */ + assertTestContainerExists(tx); + + /** + * We delete node in third transaction + */ + tx.delete(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH); + return tx; + } + + private static DOMDataReadWriteTransaction allocateAndWrite(DOMTransactionChain txChain) + throws InterruptedException, ExecutionException { + DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction(); + assertTestContainerWrite(tx); + return tx; + } + + private static void assertCommitSuccessful(ListenableFuture> future) + throws InterruptedException, ExecutionException { + RpcResult rpcResult = future.get(); + assertTrue(rpcResult.isSuccessful()); + assertEquals(TransactionStatus.COMMITED, rpcResult.getResult()); + } + + private static void assertTestContainerExists(DOMDataReadTransaction readTx) throws InterruptedException, + ExecutionException { + ListenableFuture>> readFuture = readTx.read(OPERATIONAL, TestModel.TEST_PATH); + Optional> readedData = readFuture.get(); + assertTrue(readedData.isPresent()); + } + + private static void assertTestContainerWrite(DOMDataReadWriteTransaction tx) throws InterruptedException, + ExecutionException { + tx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + assertTestContainerExists(tx); + } +}