--- /dev/null
+package org.opendaylight.controller.md.sal.common.api.data;
+
+/**
+*
+* Failure of asynchronous transaction commit caused by failure
+* of optimistic locking.
+*
+* This exception is raised and returned when transaction commit
+* failed, because other transaction finished successfully
+* and modified same data as failed transaction.
+*
+* Clients may recover from this error condition by
+* retrieving current state and submitting new updated
+* transaction.
+*
+*/
+public class OptimisticLockFailedException extends TransactionCommitFailedException {
+
+ private static final long serialVersionUID = 1L;
+
+ protected OptimisticLockFailedException(final String message, final Throwable cause, final boolean enableSuppression,
+ final boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public OptimisticLockFailedException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public OptimisticLockFailedException(final String message) {
+ super(message);
+ }
+
+}
/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
package org.opendaylight.controller.md.sal.dom.api;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.sal.core.api.BrokerService;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public interface DOMDataBroker extends AsyncDataBroker<InstanceIdentifier, NormalizedNode<?, ?>, DOMDataChangeListener>, BrokerService {
+/**
+ * Data Broker which provides data transaction and data change listener fuctionality
+ * using {@link NormalizedNode} data format.
+ *
+ * This interface is type capture of generic interfaces and returns type captures
+ * of results for client-code convenience.
+ *
+ */
+public interface DOMDataBroker extends
+ AsyncDataBroker<InstanceIdentifier, NormalizedNode<?, ?>, DOMDataChangeListener>,
+ TransactionChainFactory<InstanceIdentifier, NormalizedNode<?, ?>>, BrokerService {
+
+ /**
+ * {@inheritDoc}
+ */
@Override
DOMDataReadTransaction newReadOnlyTransaction();
+ /**
+ * {@inheritDoc}
+ */
@Override
DOMDataReadWriteTransaction newReadWriteTransaction();
+ /**
+ * {@inheritDoc}
+ */
@Override
DOMDataWriteTransaction newWriteOnlyTransaction();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ DOMTransactionChain createTransactionChain(TransactionChainListener listener);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * A chain of DOM Data transactions.
+ *
+ * Transactions in a chain need to be committed in sequence and each
+ * transaction should see the effects of previous transactions as if they happened. A chain
+ * makes no guarantees of atomicity, in fact transactions are committed as soon as possible.
+ *
+ * <p>
+ * This interface is type capture of {@link TransactionChain} for DOM Data Contracts.
+ */
+public interface DOMTransactionChain extends TransactionChain<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+ @Override
+ DOMDataReadTransaction newReadOnlyTransaction();
+
+ @Override
+ DOMDataReadWriteTransaction newReadWriteTransaction();
+
+ @Override
+ DOMDataWriteTransaction newWriteOnlyTransaction();
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Composite DOM Transaction backed by {@link DOMStoreTransaction}.
+ *
+ * Abstract base for composite transaction, which provides access only to common
+ * functionality as retrieval of subtransaction, close method and retrieval of
+ * identifier.
+ *
+ * @param <K>
+ * Subtransaction distinguisher
+ * @param <T>
+ * Subtransaction type
+ */
+abstract class AbstractDOMForwardedCompositeTransaction<K, T extends DOMStoreTransaction> implements
+ AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private final ImmutableMap<K, T> backingTxs;
+ private final Object identifier;
+
+ /**
+ *
+ * Creates new composite Transactions.
+ *
+ * @param identifier
+ * Identifier of transaction.
+ * @param backingTxs
+ * Key,value map of backing transactions.
+ */
+ protected AbstractDOMForwardedCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
+ this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+ this.backingTxs = Preconditions.checkNotNull(backingTxs, "Backing transactions should not be null");
+ }
+
+ /**
+ * Returns subtransaction associated with supplied key.
+ *
+ * @param key
+ * @return
+ * @throws NullPointerException
+ * if key is null
+ * @throws IllegalArgumentException
+ * if no subtransaction is associated with key.
+ */
+ protected final T getSubtransaction(final K key) {
+ Preconditions.checkNotNull(key, "key must not be null.");
+ Preconditions.checkArgument(backingTxs.containsKey(key), "No subtransaction associated with %s", key);
+ return backingTxs.get(key);
+ }
+
+ /**
+ * Returns immutable Iterable of all subtransactions.
+ *
+ */
+ protected Iterable<T> getSubtransactions() {
+ return backingTxs.values();
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public void close() {
+ /*
+ * We share one exception for all failures, which are added
+ * as supressedExceptions to it.
+ *
+ */
+ IllegalStateException failure = null;
+ for (T subtransaction : backingTxs.values()) {
+ try {
+ subtransaction.close();
+ } catch (Exception e) {
+ // If we did not allocated failure we allocate it
+ if(failure == null) {
+ failure = new IllegalStateException("Uncaught exception occured during closing transaction.", e);
+ } else {
+ // We update it with addotional exceptions, which occured during error.
+ failure.addSuppressed(e);
+ }
+ }
+ }
+ // If we have failure, we throw it at after all attempts to close.
+ if(failure != null) {
+ throw failure;
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ *
+ * Abstract composite transaction factory.
+ *
+ * Provides an convenience common implementation for composite DOM Transactions,
+ * where subtransaction is identified by {@link LogicalDatastoreType} type and
+ * implementation of subtransaction is provided by
+ * {@link DOMStoreTransactionFactory}.
+ *
+ * <b>Note:</b>This class does not have thread-safe implementation of {@link #close()},
+ * implementation may allow accessing and allocating new transactions during closing
+ * this instance.
+ *
+ * @param <T>
+ * Type of {@link DOMStoreTransactionFactory} factory.
+ */
+public abstract class AbstractDOMForwardedTransactionFactory<T extends DOMStoreTransactionFactory> implements DOMDataCommitImplementation, AutoCloseable {
+
+ private final ImmutableMap<LogicalDatastoreType, T> storeTxFactories;
+
+ private boolean closed;
+
+ protected AbstractDOMForwardedTransactionFactory(final Map<LogicalDatastoreType, ? extends T> txFactories) {
+ this.storeTxFactories = ImmutableMap.copyOf(txFactories);
+ }
+
+ /**
+ * Implementations must return unique identifier for each and every call of
+ * this method;
+ *
+ * @return new Unique transaction identifier.
+ */
+ protected abstract Object newTransactionIdentifier();
+
+ /**
+ * Creates a new composite read-only transaction
+ *
+ * Creates a new composite read-only transaction backed by one transaction
+ * per factory in {@link #getTxFactories()}.
+ *
+ * Subtransaction for reading is selected by supplied
+ * {@link LogicalDatastoreType} as parameter for
+ * {@link DOMDataReadTransaction#read(LogicalDatastoreType, InstanceIdentifier)}
+ * .
+ *
+ * Id of returned transaction is retrieved via
+ * {@link #newTransactionIdentifier()}.
+ *
+ * @return New composite read-only transaction.
+ */
+ public DOMDataReadTransaction newReadOnlyTransaction() {
+ checkNotClosed();
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
+ }
+ return new DOMForwardedReadOnlyTransaction(newTransactionIdentifier(), builder.build());
+ }
+
+
+
+ /**
+ * Creates a new composite write-only transaction
+ *
+ * <p>
+ * Creates a new composite write-only transaction backed by one write-only
+ * transaction per factory in {@link #getTxFactories()}.
+ *
+ * <p>
+ * Implementation of composite Write-only transaction is following:
+ *
+ * <ul>
+ * <li>
+ * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)}
+ * is invoked on selected subtransaction.
+ * <li>
+ * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)}
+ * is invoked on selected subtransaction.
+ * <li>
+ * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier)
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on
+ * selected subtransaction.
+ * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
+ * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
+ * and then invoking finalized implementation callback
+ * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * was commited and gathered results.
+ * </ul>
+ *
+ * Id of returned transaction is generated via
+ * {@link #newTransactionIdentifier()}.
+ *
+ * @return New composite write-only transaction associated with this
+ * factory.
+ */
+ public DOMDataWriteTransaction newWriteOnlyTransaction() {
+ checkNotClosed();
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
+ }
+ return new DOMForwardedWriteTransaction<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(),
+ this);
+ }
+
+ /**
+ * Creates a new composite write-only transaction
+ *
+ * <p>
+ * Creates a new composite write-only transaction backed by one write-only
+ * transaction per factory in {@link #getTxFactories()}.
+ * <p>
+ * Implementation of composite Write-only transaction is following:
+ *
+ * <ul>
+ * <li>
+ * {@link DOMDataWriteTransaction#read(LogicalDatastoreType, InstanceIdentifier)}
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#read(InstanceIdentifier)} is invoked on
+ * selected subtransaction.
+ * <li>
+ * {@link DOMDataWriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#write(InstanceIdentifier, NormalizedNode)}
+ * is invoked on selected subtransaction.
+ * <li>
+ * {@link DOMDataWriteTransaction#merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#merge(InstanceIdentifier, NormalizedNode)}
+ * is invoked on selected subtransaction.
+ * <li>
+ * {@link DOMDataWriteTransaction#delete(LogicalDatastoreType, InstanceIdentifier)
+ * - backing subtransaction is selected by {@link LogicalDatastoreType},
+ * {@link DOMStoreWriteTransaction#delete(InstanceIdentifier)} is invoked on
+ * selected subtransaction.
+ * <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
+ * {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
+ * and then invoking finalized implementation callback
+ * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * was commited and gathered results.
+ * <li>
+ * </ul>
+ *
+ * Id of returned transaction is generated via
+ * {@link #newTransactionIdentifier()}.
+ *
+ * @return New composite read-write transaction associated with this
+ * factory.
+ *
+ */
+ public DOMDataReadWriteTransaction newReadWriteTransaction() {
+ checkNotClosed();
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
+ for (Entry<LogicalDatastoreType, T> store : storeTxFactories.entrySet()) {
+ builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
+ }
+ return new DOMForwardedReadWriteTransaction(newTransactionIdentifier(), builder.build(), this);
+ }
+
+ /**
+ * Convenience accessor of backing factories intended to be used only by
+ * finalization of this class.
+ *
+ * <b>Note:</b>
+ * Finalization of this class may want to access other functionality of
+ * supplied Transaction factories.
+ *
+ * @return Map of backing transaction factories.
+ */
+ protected final Map<LogicalDatastoreType, T> getTxFactories() {
+ return storeTxFactories;
+ }
+
+ /**
+ *
+ * Checks if instance is not closed.
+ *
+ * @throws IllegalStateException If instance of this class was closed.
+ *
+ */
+ @GuardedBy("this")
+ protected synchronized void checkNotClosed() {
+ Preconditions.checkState(!closed,"Transaction factory was closed. No further operations allowed.");
+ }
+
+ @Override
+ @GuardedBy("this")
+ public synchronized void close() {
+ closed = true;
+ }
+
+}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import java.util.Collections;
-import java.util.List;
import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
+public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
+ AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
- private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
- private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
- private final ListeningExecutorService executor;
+
+ private final DOMDataCommitCoordinatorImpl coordinator;
private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
final ListeningExecutorService executor) {
- super();
- this.datastores = datastores;
- this.executor = executor;
+ super(datastores);
+ this.coordinator = new DOMDataCommitCoordinatorImpl(executor);
}
- private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
-
- @Override
- public Boolean apply(final Iterable<Boolean> input) {
-
- for (Boolean value : input) {
- if (value == false) {
- return Boolean.FALSE;
- }
- }
- return Boolean.TRUE;
- }
- };
-
@Override
- public DOMDataReadTransaction newReadOnlyTransaction() {
- ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadTransaction> builder = ImmutableMap.builder();
- for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
- builder.put(store.getKey(), store.getValue().newReadOnlyTransaction());
- }
- return new ReadOnlyTransactionImpl(newTransactionIdentifier(), builder.build());
- }
-
- private Object newTransactionIdentifier() {
+ protected Object newTransactionIdentifier() {
return "DOM-" + txNum.getAndIncrement();
}
- @Override
- public DOMDataReadWriteTransaction newReadWriteTransaction() {
- ImmutableMap.Builder<LogicalDatastoreType, DOMStoreReadWriteTransaction> builder = ImmutableMap.builder();
- for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
- builder.put(store.getKey(), store.getValue().newReadWriteTransaction());
- }
- return new ReadWriteTransactionImpl(newTransactionIdentifier(), builder.build(), this);
- }
-
- @Override
- public DOMDataWriteTransaction newWriteOnlyTransaction() {
- ImmutableMap.Builder<LogicalDatastoreType, DOMStoreWriteTransaction> builder = ImmutableMap.builder();
- for (Entry<LogicalDatastoreType, DOMStore> store : datastores.entrySet()) {
- builder.put(store.getKey(), store.getValue().newWriteOnlyTransaction());
- }
- return new WriteTransactionImpl<DOMStoreWriteTransaction>(newTransactionIdentifier(), builder.build(), this);
- }
-
@Override
public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
final InstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
- DOMStore potentialStore = datastores.get(store);
+ DOMStore potentialStore = getTxFactories().get(store);
checkState(potentialStore != null, "Requested logical data store is not available.");
return potentialStore.registerChangeListener(path, listener, triggeringScope);
}
- private ListenableFuture<RpcResult<TransactionStatus>> submit(
- final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
- LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- return executor.submit(new CommitCoordination(transaction));
- }
-
- private abstract static class AbstractCompositeTransaction<K, T extends DOMStoreTransaction> implements
- AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>> {
-
- private final ImmutableMap<K, T> backingTxs;
- private final Object identifier;
-
- protected AbstractCompositeTransaction(final Object identifier, final ImmutableMap<K, T> backingTxs) {
- this.identifier = checkNotNull(identifier, "Identifier should not be null");
- this.backingTxs = checkNotNull(backingTxs, "Backing transactions should not be null");
- }
-
- protected T getSubtransaction(final K key) {
- return backingTxs.get(key);
- }
-
- public Iterable<T> getSubtransactions() {
- return backingTxs.values();
- }
-
- @Override
- public Object getIdentifier() {
- return identifier;
- }
-
- @Override
- public void close() {
- try {
- for (T subtransaction : backingTxs.values()) {
- subtransaction.close();
- }
- } catch (Exception e) {
- throw new IllegalStateException("Uncaught exception occured during closing transaction.", e);
- }
- }
-
- }
-
- private static class ReadOnlyTransactionImpl extends
- AbstractCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
- DOMDataReadTransaction {
-
- protected ReadOnlyTransactionImpl(final Object identifier,
- final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
- super(identifier, backingTxs);
- }
-
- @Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
- final InstanceIdentifier path) {
- return getSubtransaction(store).read(path);
- }
-
- }
-
- private static class WriteTransactionImpl<T extends DOMStoreWriteTransaction> extends
- AbstractCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
-
- private final DOMDataBrokerImpl broker;
- private ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts;
-
- protected WriteTransactionImpl(final Object identifier, final ImmutableMap<LogicalDatastoreType, T> backingTxs,
- final DOMDataBrokerImpl broker) {
- super(identifier, backingTxs);
- this.broker = broker;
- }
-
- public synchronized Iterable<DOMStoreThreePhaseCommitCohort> ready() {
- checkState(cohorts == null, "Transaction was already marked as ready.");
- ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
- for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
- cohortsBuilder.add(subTx.ready());
- }
- cohorts = cohortsBuilder.build();
- return cohorts;
- }
-
- protected ImmutableList<DOMStoreThreePhaseCommitCohort> getCohorts() {
- return cohorts;
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
- getSubtransaction(store).write(path, data);
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
- getSubtransaction(store).delete(path);
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
- final NormalizedNode<?, ?> data) {
- getSubtransaction(store).merge(path,data);
- }
-
- @Override
- public void cancel() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-
- ready();
- return broker.submit(this);
- }
-
- }
-
- private static class ReadWriteTransactionImpl extends WriteTransactionImpl<DOMStoreReadWriteTransaction> implements
- DOMDataReadWriteTransaction {
-
- protected ReadWriteTransactionImpl(final Object identifier,
- final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
- final DOMDataBrokerImpl broker) {
- // super(identifier, backingTxs);
- super(identifier, backingTxs, broker);
- }
-
- @Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
- final InstanceIdentifier path) {
- return getSubtransaction(store).read(path);
- }
- }
-
- private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
-
- private final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction;
-
- public CommitCoordination(final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
- this.transaction = transaction;
- }
-
- @Override
- public RpcResult<TransactionStatus> call() throws Exception {
-
- try {
- Boolean canCommit = canCommit().get();
-
- if (canCommit) {
- try {
- preCommit().get();
- try {
- commit().get();
- COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
- Collections.<RpcError> emptySet());
-
- } catch (InterruptedException | ExecutionException e) {
- COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
- }
-
- } catch (InterruptedException | ExecutionException e) {
- COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
- transaction.getIdentifier(), e);
- }
- } else {
- COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
- abort().get();
- }
- } catch (InterruptedException | ExecutionException e) {
- COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
-
- }
- try {
- abort().get();
- } catch (InterruptedException | ExecutionException e) {
- COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
- }
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
- }
-
- public ListenableFuture<Void> preCommit() {
- COORDINATOR_LOG.debug("Transaction {}: PreCommit Started ", transaction.getIdentifier());
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
- ops.add(cohort.preCommit());
- }
- return (ListenableFuture) Futures.allAsList(ops.build());
- }
-
- public ListenableFuture<Void> commit() {
- COORDINATOR_LOG.debug("Transaction {}: Commit Started ", transaction.getIdentifier());
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
- ops.add(cohort.commit());
- }
- return (ListenableFuture) Futures.allAsList(ops.build());
- }
-
- public ListenableFuture<Boolean> canCommit() {
- COORDINATOR_LOG.debug("Transaction {}: CanCommit Started ", transaction.getIdentifier());
- Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
- canCommitOperations.add(cohort.canCommit());
- }
- ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
- return Futures.transform(allCanCommits, AND_FUNCTION);
- }
-
- public ListenableFuture<Void> abort() {
- COORDINATOR_LOG.debug("Transaction {}: Abort Started ", transaction.getIdentifier());
- Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
- for (DOMStoreThreePhaseCommitCohort cohort : transaction.getCohorts()) {
- ops.add(cohort.abort());
- }
- return (ListenableFuture) Futures.allAsList(ops.build());
- };
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ ImmutableMap.Builder<LogicalDatastoreType, DOMStoreTransactionChain> backingChainsBuilder = ImmutableMap
+ .builder();
+ for (Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+ backingChainsBuilder.put(entry.getKey(), entry.getValue().createTransactionChain());
+ }
+ long chainId = chainNum.getAndIncrement();
+ ImmutableMap<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = backingChainsBuilder.build();
+ LOG.debug("Transactoin chain {} created with listener {}, backing store chains {}", chainId, listener,
+ backingChains);
+ return new DOMDataBrokerTransactionChainImpl(chainId, backingChains, coordinator, listener);
}
@Override
- public void close() throws Exception {
-
+ public ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+ LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
+ return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * NormalizedNode implementation of {@link TransactionChain} which is backed
+ * by several {@link DOMStoreTransactionChain} differentiated by provided
+ * {@link LogicalDatastoreType} type.
+ *
+ */
+public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+ implements DOMTransactionChain, DOMDataCommitErrorListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
+ private final DOMDataCommitExecutor coordinator;
+ private final TransactionChainListener listener;
+ private final long chainId;
+ private final AtomicLong txNum = new AtomicLong();
+ @GuardedBy("this")
+ private boolean failed = false;
+
+ /**
+ *
+ * @param chainId
+ * ID of transaction chain
+ * @param chains
+ * Backing {@link DOMStoreTransactionChain}s.
+ * @param coordinator
+ * Commit Coordinator which should be used to coordinate commits
+ * of transaction
+ * produced by this chain.
+ * @param listener
+ * Listener, which listens on transaction chain events.
+ * @throws NullPointerException
+ * If any of arguments is null.
+ */
+ public DOMDataBrokerTransactionChainImpl(final long chainId,
+ final ImmutableMap<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+ final DOMDataCommitExecutor coordinator, final TransactionChainListener listener) {
+ super(chains);
+ this.chainId = chainId;
+ this.coordinator = Preconditions.checkNotNull(coordinator);
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit(
+ final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+ return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+ }
+
+ @Override
+ public synchronized void close() {
+ super.close();
+ for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
+ subChain.close();
+ }
+
+ if (!failed) {
+ LOG.debug("Transaction chain {}Â successfully finished.", this);
+ listener.onTransactionChainSuccessful(this);
+ }
+ }
+
+ @Override
+ public synchronized void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+ failed = true;
+ LOG.debug("Transaction chain {}Â failed.", this, cause);
+ listener.onTransactionChainFailed(this, tx, cause);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ *
+ * Implementation of blocking three phase commit coordinator, which which
+ * supports coordination on multiple {@link DOMStoreThreePhaseCommitCohort}.
+ *
+ * This implementation does not support cancelation of commit,
+ *
+ * In order to advance to next phase of three phase commit all subtasks of
+ * previous step must be finish.
+ *
+ * This executor does not have an upper bound on subtask timeout.
+ *
+ *
+ */
+public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataCommitCoordinatorImpl.class);
+
+ /**
+ * Runs AND binary operation between all booleans in supplied iteration of booleans.
+ *
+ * This method will stop evaluating iterables if first found is false.
+ */
+ private static final Function<Iterable<Boolean>, Boolean> AND_FUNCTION = new Function<Iterable<Boolean>, Boolean>() {
+
+ @Override
+ public Boolean apply(final Iterable<Boolean> input) {
+ for(boolean value : input) {
+ if(!value) {
+ return Boolean.FALSE;
+ }
+ }
+ return Boolean.TRUE;
+ }
+ };
+
+ private final ListeningExecutorService executor;
+
+ /**
+ *
+ * Construct DOMDataCommitCoordinator which uses supplied executor to
+ * process commit coordinations.
+ *
+ * @param executor
+ */
+ public DOMDataCommitCoordinatorImpl(final ListeningExecutorService executor) {
+ this.executor = Preconditions.checkNotNull(executor, "executor must not be null.");
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+ Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
+ Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
+ Preconditions.checkArgument(listener != null, "Listener must not be null");
+ LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
+ ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
+ transaction, cohorts, listener));
+ if (listener.isPresent()) {
+ Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
+ }
+ return commitFuture;
+ }
+
+ /**
+ *
+ * Phase of 3PC commit
+ *
+ * Represents phase of 3PC Commit
+ *
+ *
+ */
+ private static enum CommitPhase {
+ /**
+ *
+ * Commit Coordination Task is submitted for executing
+ *
+ */
+ SUBMITTED,
+ /**
+ * Commit Coordination Task is in can commit phase of 3PC
+ *
+ */
+ CAN_COMMIT,
+ /**
+ * Commit Coordination Task is in pre-commit phase of 3PC
+ *
+ */
+ PRE_COMMIT,
+ /**
+ * Commit Coordination Task is in commit phase of 3PC
+ *
+ */
+ COMMIT,
+ /**
+ * Commit Coordination Task is in abort phase of 3PC
+ *
+ */
+ ABORT
+ }
+
+ /**
+ *
+ * Implementation of blocking three-phase commit-coordination tasks without
+ * support of cancelation.
+ *
+ */
+ private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+
+ private final DOMDataWriteTransaction tx;
+ private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+
+ @GuardedBy("this")
+ private CommitPhase currentPhase;
+
+ public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
+ final Optional<DOMDataCommitErrorListener> listener) {
+ this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
+ this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+ this.currentPhase = CommitPhase.SUBMITTED;
+ }
+
+ @Override
+ public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+
+ try {
+ canCommitBlocking();
+ preCommitBlocking();
+ return commitBlocking();
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
+ abortBlocking(e);
+ throw e;
+ }
+ }
+
+ /**
+ *
+ * Invokes canCommit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from SUBMITTED to CAN_COMMIT,
+ * if currentPhase is not SUBMITTED throws IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed can Commit
+ *
+ */
+ private void canCommitBlocking() throws TransactionCommitFailedException {
+ final Boolean canCommitResult = canCommitAll().checkedGet();
+ if (!canCommitResult) {
+ throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+ }
+ }
+
+ /**
+ *
+ * Invokes preCommit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+ * state is not CAN_COMMIT
+ * throws IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed preCommit
+ *
+ */
+ private void preCommitBlocking() throws TransactionCommitFailedException {
+ preCommitAll().checkedGet();
+ }
+
+ /**
+ *
+ * Invokes commit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+ * IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed preCommit
+ *
+ */
+ private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+ commitAll().checkedGet();
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError> emptySet());
+ }
+
+ /**
+ * Aborts transaction.
+ *
+ * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+ * cohorts, blocks
+ * for all results. If any of the abort failed throws
+ * IllegalStateException,
+ * which will contains originalCause as suppressed Exception.
+ *
+ * If aborts we're successful throws supplied exception
+ *
+ * @param originalCause
+ * Exception which should be used to fail transaction for
+ * consumers of transaction
+ * future and listeners of transaction failure.
+ * @throws TransactionCommitFailedException
+ * on invocation of this method.
+ * originalCa
+ * @throws IllegalStateException
+ * if abort failed.
+ */
+ private void abortBlocking(final TransactionCommitFailedException originalCause)
+ throws TransactionCommitFailedException {
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, originalCause);
+ Exception cause = originalCause;
+ try {
+ abortAsyncAll().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+ cause = new IllegalStateException("Abort failed.", e);
+ cause.addSuppressed(e);
+ }
+ Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
+ }
+
+ /**
+ *
+ * Invokes preCommit on underlying cohorts and returns future
+ * which will complete once all preCommit on cohorts completed or
+ * failed.
+ *
+ *
+ * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+ * state is not CAN_COMMIT
+ * throws IllegalStateException.
+ *
+ * @return Future which will complete once all cohorts completed
+ * preCommit.
+ * Future throws TransactionCommitFailedException
+ * If any of cohorts failed preCommit
+ *
+ */
+ private CheckedFuture<Void, TransactionCommitFailedException> preCommitAll() {
+ changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops.add(cohort.preCommit());
+ }
+ /*
+ * We are returing all futures as list, not only succeeded ones in
+ * order to fail composite future if any of them failed.
+ * See Futures.allAsList for this description.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+ return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER);
+ }
+
+ /**
+ *
+ * Invokes commit on underlying cohorts and returns future which
+ * completes
+ * once all commits on cohorts are completed.
+ *
+ * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+ * IllegalStateException
+ *
+ * @return Future which will complete once all cohorts completed
+ * commit.
+ * Future throws TransactionCommitFailedException
+ * If any of cohorts failed preCommit
+ *
+ */
+ private CheckedFuture<Void, TransactionCommitFailedException> commitAll() {
+ changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops.add(cohort.commit());
+ }
+ /*
+ * We are returing all futures as list, not only succeeded ones in
+ * order to fail composite future if any of them failed.
+ * See Futures.allAsList for this description.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+ return Futures.makeChecked(compositeResult, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ }
+
+ /**
+ *
+ * Invokes canCommit on underlying cohorts and returns composite future
+ * which will contains {@link Boolean#TRUE} only and only if
+ * all cohorts returned true.
+ *
+ * Valid state transition is from SUBMITTED to CAN_COMMIT,
+ * if currentPhase is not SUBMITTED throws IllegalStateException.
+ *
+ * @return Future which will complete once all cohorts completed
+ * preCommit.
+ * Future throws TransactionCommitFailedException
+ * If any of cohorts failed preCommit
+ *
+ */
+ private CheckedFuture<Boolean, TransactionCommitFailedException> canCommitAll() {
+ changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
+ Builder<ListenableFuture<Boolean>> canCommitOperations = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ canCommitOperations.add(cohort.canCommit());
+ }
+ ListenableFuture<List<Boolean>> allCanCommits = Futures.allAsList(canCommitOperations.build());
+ ListenableFuture<Boolean> allSuccessFuture = Futures.transform(allCanCommits, AND_FUNCTION);
+ return Futures
+ .makeChecked(allSuccessFuture, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER);
+
+ }
+
+ /**
+ *
+ * Invokes abort on underlying cohorts and returns future which
+ * completes
+ * once all abort on cohorts are completed.
+ *
+ * @return Future which will complete once all cohorts completed
+ * abort.
+ *
+ */
+ private ListenableFuture<Void> abortAsyncAll() {
+ changeStateFrom(currentPhase, CommitPhase.ABORT);
+ Builder<ListenableFuture<Void>> ops = ImmutableList.builder();
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops.add(cohort.abort());
+ }
+ /*
+ * We are returing all futures as list, not only succeeded ones in
+ * order to fail composite future if any of them failed.
+ * See Futures.allAsList for this description.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops.build());
+ return compositeResult;
+ }
+
+ /**
+ * Change phase / state of transaction from expected value to new value
+ *
+ * This method checks state and updates state to new state of
+ * of this task if current state equals expected state.
+ * If expected state and current state are different raises
+ * IllegalStateException
+ * which means there is probably bug in implementation of commit
+ * coordination.
+ *
+ * If transition is successful, it logs transition on DEBUG level.
+ *
+ * @param currentExpected
+ * Required phase for change of state
+ * @param newState
+ * New Phase which will be entered by transaction.
+ * @throws IllegalStateException
+ * If currentState of task does not match expected state
+ */
+ private synchronized void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
+ Preconditions.checkState(currentPhase.equals(currentExpected),
+ "Invalid state transition: Tx: %s current state: %s new state: %s", tx.getIdentifier(),
+ currentPhase, newState);
+ LOG.debug("Transaction {}: Phase {} Started ", tx.getIdentifier(), newState);
+ currentPhase = newState;
+ };
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+
+/**
+ *
+ * Utility implemetation of {@link FutureCallback} which is responsible
+ * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
+ *
+ * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
+ * callback is invoked with associated transaction and throwable is invoked on listener.
+ *
+ */
+class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionStatus>> {
+
+ private final DOMDataWriteTransaction tx;
+ private final DOMDataCommitErrorListener listener;
+
+
+ /**
+ *
+ * Construct new DOMDataCommitErrorInvoker.
+ *
+ * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
+ * @param listener Listener which should be invoked on error.
+ */
+ public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
+ this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
+ this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ listener.onCommitFailed(tx, t);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<TransactionStatus> result) {
+ // NOOP
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.EventListener;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+
+/**
+ *
+ * Listener on transaction failure which may be passed to
+ * {@link DOMDataCommitExecutor}. This listener is notified during transaction
+ * processing, before result is delivered to other client code outside MD-SAL.
+ * This allows implementors to update their internal state before transaction
+ * failure is visible to client code.
+ *
+ * This is internal API for MD-SAL implementations, for consumer facing error
+ * listeners see {@link TransactionChainListener}.
+ *
+ */
+interface DOMDataCommitErrorListener extends EventListener {
+
+ /**
+ *
+ * Callback which is invoked on transaction failure during three phase
+ * commit in {@link DOMDataCommitExecutor}.
+ *
+ *
+ * Implementation of this callback MUST NOT do any blocking calls or any
+ * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
+ * Broker coordination thread.
+ *
+ * @param tx
+ * Transaction which failed
+ * @param cause
+ * Failure reason
+ */
+ void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Executor of Three Phase Commit coordination for
+ * {@link DOMDataWriteTransaction} transactions.
+ *
+ * Implementations are responsible for executing implementation of three-phase
+ * commit protocol on supplied {@link DOMStoreThreePhaseCommitCohort}s.
+ *
+ *
+ */
+interface DOMDataCommitExecutor {
+
+ /**
+ * Submits supplied transaction to be executed in context of provided
+ * cohorts.
+ *
+ * Transaction is used only as a context, cohorts should be associated with
+ * this transaction.
+ *
+ * @param tx
+ * Transaction to be used as context for reporting
+ * @param cohort
+ * DOM Store cohorts representing provided transaction, its
+ * subtransactoins.
+ * @param listener
+ * Error listener which should be notified if transaction failed.
+ * @return ListenableFuture which contains RpcResult with
+ * {@link TransactionStatus#COMMITED} if commit coordination on
+ * cohorts finished successfully.
+ *
+ */
+ ListenableFuture<RpcResult<TransactionStatus>> submit(DOMDataWriteTransaction tx,
+ Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Implementation prototype of commit method for
+ * {@link DOMForwardedWriteTransaction}.
+ *
+ */
+public interface DOMDataCommitImplementation {
+
+ /**
+ * User-supplied implementation of {@link DOMDataWriteTransaction#commit()}
+ * for transaction.
+ *
+ * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked
+ * on transaction created by this factory.
+ *
+ * @param transaction
+ * Transaction on which {@link DOMDataWriteTransaction#commit()}
+ * was invoked.
+ * @param cohorts
+ * Iteration of cohorts for subtransactions associated with
+ * commited transaction.
+ *
+ */
+ ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Read Only Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in
+ * {@link #read(LogicalDatastoreType, InstanceIdentifier)}.
+ */
+class DOMForwardedReadOnlyTransaction extends
+ AbstractDOMForwardedCompositeTransaction<LogicalDatastoreType, DOMStoreReadTransaction> implements
+ DOMDataReadTransaction {
+
+ protected DOMForwardedReadOnlyTransaction(final Object identifier,
+ final ImmutableMap<LogicalDatastoreType, DOMStoreReadTransaction> backingTxs) {
+ super(identifier, backingTxs);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Read-Write Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in:
+ *
+ * <ul>
+ * <li>{@link #read(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * <li>{@link #delete(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * </ul>
+ * {@link #commit()} will result in invocation of
+ * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying
+ * transactions.
+ *
+ */
+class DOMForwardedReadWriteTransaction extends DOMForwardedWriteTransaction<DOMStoreReadWriteTransaction> implements
+ DOMDataReadWriteTransaction {
+
+ protected DOMForwardedReadWriteTransaction(final Object identifier,
+ final ImmutableMap<LogicalDatastoreType, DOMStoreReadWriteTransaction> backingTxs,
+ final DOMDataCommitImplementation commitImpl) {
+ super(identifier, backingTxs, commitImpl);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * /**
+ *
+ * Read-Write Transaction, which is composed of several
+ * {@link DOMStoreReadTransaction} transactions. Subtransaction is selected by
+ * {@link LogicalDatastoreType} type parameter in:
+ *
+ * <ul>
+ * <li>{@link #put(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * <li>{@link #delete(LogicalDatastoreType, InstanceIdentifier)}
+ * <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
+ * </ul>
+ * <p>
+ * {@link #commit()} will result in invocation of
+ * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * invocation with all {@link DOMStoreThreePhaseCommitCohort} for underlying
+ * transactions.
+ *
+ * @param <T>
+ * Subtype of {@link DOMStoreWriteTransaction} which is used as
+ * subtransaction.
+ */
+class DOMForwardedWriteTransaction<T extends DOMStoreWriteTransaction> extends
+ AbstractDOMForwardedCompositeTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+ @GuardedBy("this")
+ private DOMDataCommitImplementation commitImpl;
+
+ @GuardedBy("this")
+ private boolean canceled;
+ @GuardedBy("this")
+ private ListenableFuture<RpcResult<TransactionStatus>> commitFuture;
+
+ protected DOMForwardedWriteTransaction(final Object identifier,
+ final ImmutableMap<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
+ super(identifier, backingTxs);
+ this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkNotReady();
+ getSubtransaction(store).write(path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final InstanceIdentifier path) {
+ checkNotReady();
+ getSubtransaction(store).delete(path);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkNotReady();
+ getSubtransaction(store).merge(path, data);
+ }
+
+ @Override
+ public synchronized void cancel() {
+ checkState(!canceled, "Transaction was canceled.");
+ if (commitFuture != null) {
+ // FIXME: Implement cancelation of commit future
+ // when Broker impl will support cancelation.
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+ canceled = true;
+ commitImpl = null;
+
+ }
+
+ @Override
+ public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ checkNotReady();
+
+ ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
+ for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
+ cohortsBuilder.add(subTx.ready());
+ }
+ ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts = cohortsBuilder.build();
+ commitFuture = commitImpl.commit(this, cohorts);
+ return commitFuture;
+ }
+
+ private void checkNotReady() {
+ checkNotCanceled();
+ checkNotCommited();
+ }
+
+ private void checkNotCanceled() {
+ Preconditions.checkState(!canceled, "Transaction was canceled.");
+ }
+
+ private void checkNotCommited() {
+ checkState(commitFuture == null, "Transaction was already commited.");
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ *
+ * Utility exception mapper which translates {@link Exception}
+ * to {@link TransactionCommitFailedException}.
+ *
+ * This mapper is intended to be used with {@link Futures#makeChecked(com.google.common.util.concurrent.ListenableFuture, Function)}
+ * <ul>
+ * <li>if exception is {@link TransactionCommitFailedException} or one of its subclasses returns original exception.
+ * <li>if exception is {@link ExecutionException} and cause is {@link TransactionCommitFailedException} return cause
+ * <li>otherwise returns {@link TransactionCommitFailedException} with original exception as a cause.
+ * </ul>
+ *
+ */
+final class TransactionCommitFailedExceptionMapper implements
+ Function<Exception, TransactionCommitFailedException> {
+
+ static final TransactionCommitFailedExceptionMapper PRE_COMMIT_MAPPER = create("canCommit");
+
+ static final TransactionCommitFailedExceptionMapper CAN_COMMIT_ERROR_MAPPER = create("preCommit");
+
+ static final TransactionCommitFailedExceptionMapper COMMIT_ERROR_MAPPER = create("commit");
+
+ private final String opName;
+
+ private TransactionCommitFailedExceptionMapper(final String opName) {
+ this.opName = Preconditions.checkNotNull(opName);
+ }
+
+ public static final TransactionCommitFailedExceptionMapper create(final String opName) {
+ return new TransactionCommitFailedExceptionMapper(opName);
+ }
+
+ @Override
+ public TransactionCommitFailedException apply(final Exception e) {
+ // If excetion is TransactionCommitFailedException
+ // we reuse it directly.
+ if (e instanceof TransactionCommitFailedException) {
+ return (TransactionCommitFailedException) e;
+ }
+ // If error is ExecutionException which was caused by cause of
+ // TransactionCommitFailedException
+ // we reuse original cause
+ if (e instanceof ExecutionException && e.getCause() instanceof TransactionCommitFailedException) {
+ return (TransactionCommitFailedException) e.getCause();
+ }
+ if (e instanceof InterruptedException) {
+ return new TransactionCommitFailedException(opName + " failed - DOMStore was interupted.", e);
+ }
+ // Otherwise we are using new exception, with original cause
+ return new TransactionCommitFailedException(opName + " failed", e);
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ConflictingModificationAppliedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeCandidate;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeSnapshot;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataValidationFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.InMemoryDataTreeFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
public ListenableFuture<Boolean> canCommit() {
return executor.submit(new Callable<Boolean>() {
@Override
- public Boolean call() {
+ public Boolean call() throws TransactionCommitFailedException {
try {
dataTree.validate(modification);
LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
return true;
- } catch (DataPreconditionFailedException e) {
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
+ e.getPath());
+ throw new OptimisticLockFailedException("Optimistic lock failed.",e);
+ } catch (DataValidationFailedException e) {
LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
e.getPath(), e);
- return false;
+ throw new TransactionCommitFailedException("Data did not pass validation.",e);
}
}
});
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * Exception thrown when a proposed change fails validation before being
+ * applied into the Data Tree because the Data Tree has been modified
+ * in way that a conflicting
+ * node is present.
+ */
+public class ConflictingModificationAppliedException extends DataValidationFailedException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public ConflictingModificationAppliedException(final InstanceIdentifier path, final String message, final Throwable cause) {
+ super(path, message, cause);
+ }
+
+ public ConflictingModificationAppliedException(final InstanceIdentifier path, final String message) {
+ super(path, message);
+ }
+
+}
/**
* Validate whether a particular modification can be applied to the data tree.
*/
- void validate(DataTreeModification modification) throws DataPreconditionFailedException;
+ void validate(DataTreeModification modification) throws DataValidationFailedException;
/**
* Prepare a modification for commit.
* the datastore has been concurrently modified such that a conflicting
* node is present, or the modification is structurally incorrect.
*/
-public class DataPreconditionFailedException extends Exception {
+public class DataValidationFailedException extends Exception {
private static final long serialVersionUID = 1L;
private final InstanceIdentifier path;
* @param path Object path which caused this exception
* @param message Specific message describing the failure
*/
- public DataPreconditionFailedException(final InstanceIdentifier path, final String message) {
+ public DataValidationFailedException(final InstanceIdentifier path, final String message) {
this(path, message, null);
}
/**
* @param message Specific message describing the failure
* @param cause Exception which triggered this failure, may be null
*/
- public DataPreconditionFailedException(final InstanceIdentifier path, final String message, final Throwable cause) {
+ public DataValidationFailedException(final InstanceIdentifier path, final String message, final Throwable cause) {
super(message, cause);
this.path = Preconditions.checkNotNull(path);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+/**
+ * Exception thrown when a proposed change fails validation before being
+ * applied into the datastore because of incorrect structure of user supplied
+ * data.
+ *
+ */
+public class IncorrectDataStructureException extends DataValidationFailedException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public IncorrectDataStructureException(final InstanceIdentifier path, final String message, final Throwable cause) {
+ super(path, message, cause);
+ }
+
+ public IncorrectDataStructureException(final InstanceIdentifier path, final String message) {
+ super(path, message);
+ }
+
+}
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataValidationFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeCandidate;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification;
}
@Override
- public void validate(final DataTreeModification modification) throws DataPreconditionFailedException {
+ public void validate(final DataTreeModification modification) throws DataValidationFailedException {
Preconditions.checkArgument(modification instanceof InMemoryDataTreeModification, "Invalid modification class %s", modification.getClass());
final InMemoryDataTreeModification m = (InMemoryDataTreeModification)modification;
*/
package org.opendaylight.controller.md.sal.dom.store.impl.tree.data;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataValidationFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreTreeNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.Version;
* @param current Metadata Node to which modification should be applied
* @return true if modification is applicable
* false if modification is no applicable
- * @throws DataPreconditionFailedException
+ * @throws DataValidationFailedException
*/
- void checkApplicable(InstanceIdentifier path, NodeModification modification, Optional<TreeNode> current) throws DataPreconditionFailedException;
+ void checkApplicable(InstanceIdentifier path, NodeModification modification, Optional<TreeNode> current) throws DataValidationFailedException;
}
import java.util.Map;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataValidationFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.DataNodeContainerModificationStrategy.ListEntryModificationStrategy;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.ValueNodeModificationStrategy.LeafSetEntryModificationStrategy;
@Override
protected void checkWriteApplicable(final InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ final Optional<TreeNode> current) throws DataValidationFailedException {
// FIXME: Implement proper write check for replacement of node container
// prerequisite is to have transaction chain available for clients
// otherwise this will break chained writes to same node.
@Override
protected void checkSubtreeModificationApplicable(final InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException {
- checkDataPrecondition(path, current.isPresent(), "Node was deleted by other transaction.");
+ final Optional<TreeNode> current) throws DataValidationFailedException {
+ checkConflicting(path, current.isPresent(), "Node was deleted by other transaction.");
checkChildPreconditions(path, modification, current);
}
- private void checkChildPreconditions(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ private void checkChildPreconditions(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataValidationFailedException {
final TreeNode currentMeta = current.get();
for (NodeModification childMod : modification.getChildren()) {
final PathArgument childId = childMod.getIdentifier();
@Override
protected void checkMergeApplicable(final InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ final Optional<TreeNode> current) throws DataValidationFailedException {
if(current.isPresent()) {
checkChildPreconditions(path, modification,current);
}
import java.util.List;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ConflictingModificationAppliedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataValidationFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.IncorrectDataStructureException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.DataNodeContainerModificationStrategy.ContainerModificationStrategy;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.DataNodeContainerModificationStrategy.UnkeyedListItemModificationStrategy;
return null;
}
- public static boolean checkDataPrecondition(final InstanceIdentifier path, final boolean condition, final String message) throws DataPreconditionFailedException {
+ public static boolean checkConflicting(final InstanceIdentifier path, final boolean condition, final String message) throws ConflictingModificationAppliedException {
if(!condition) {
- throw new DataPreconditionFailedException(path, message);
+ throw new ConflictingModificationAppliedException(path, message);
}
return condition;
}
}
}
- private static final void checkNotConflicting(final InstanceIdentifier path, final TreeNode original, final TreeNode current) throws DataPreconditionFailedException {
- checkDataPrecondition(path, original.getVersion().equals(current.getVersion()),
+ private static final void checkNotConflicting(final InstanceIdentifier path, final TreeNode original, final TreeNode current) throws ConflictingModificationAppliedException {
+ checkConflicting(path, original.getVersion().equals(current.getVersion()),
"Node was replaced by other transaction.");
- checkDataPrecondition(path, original.getSubtreeVersion().equals(current.getSubtreeVersion()),
+ checkConflicting(path, original.getSubtreeVersion().equals(current.getSubtreeVersion()),
"Node children was modified by other transaction");
}
}
@Override
- public final void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ public final void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<TreeNode> current) throws DataValidationFailedException {
switch (modification.getType()) {
case DELETE:
checkDeleteApplicable(modification, current);
}
- protected void checkMergeApplicable(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ protected void checkMergeApplicable(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataValidationFailedException {
Optional<TreeNode> original = modification.getOriginal();
if (original.isPresent() && current.isPresent()) {
/*
}
}
- protected void checkWriteApplicable(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataPreconditionFailedException {
+ protected void checkWriteApplicable(final InstanceIdentifier path, final NodeModification modification, final Optional<TreeNode> current) throws DataValidationFailedException {
Optional<TreeNode> original = modification.getOriginal();
if (original.isPresent() && current.isPresent()) {
checkNotConflicting(path, original.get(), current.get());
} else if(original.isPresent()) {
- throw new DataPreconditionFailedException(path,"Node was deleted by other transaction.");
+ throw new ConflictingModificationAppliedException(path,"Node was deleted by other transaction.");
}
}
protected abstract TreeNode applySubtreeChange(ModifiedNode modification,
TreeNode currentMeta, Version version);
+ /**
+ *
+ * Checks is supplied {@link NodeModification} is applicable for Subtree Modification.
+ *
+ * @param path Path to current node
+ * @param modification Node modification which should be applied.
+ * @param current Current state of data tree
+ * @throws ConflictingModificationAppliedException If subtree was changed in conflicting way
+ * @throws IncorrectDataStructureException If subtree modification is not applicable (e.g. leaf node).
+ */
protected abstract void checkSubtreeModificationApplicable(InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException;
+ final Optional<TreeNode> current) throws DataValidationFailedException;
protected abstract void verifyWrittenStructure(NormalizedNode<?, ?> writtenValue);
@Override
protected void checkSubtreeModificationApplicable(final InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException {
- throw new DataPreconditionFailedException(path, "Subtree modification is not allowed.");
+ final Optional<TreeNode> current) throws IncorrectDataStructureException {
+ throw new IncorrectDataStructureException(path, "Subtree modification is not allowed.");
}
}
}
import static com.google.common.base.Preconditions.checkArgument;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.IncorrectDataStructureException;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.TreeNodeFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.spi.Version;
@Override
protected void checkSubtreeModificationApplicable(final InstanceIdentifier path, final NodeModification modification,
- final Optional<TreeNode> current) throws DataPreconditionFailedException {
- throw new DataPreconditionFailedException(path, "Subtree modification is not allowed.");
+ final Optional<TreeNode> current) throws IncorrectDataStructureException {
+ throw new IncorrectDataStructureException(path, "Subtree modification is not allowed.");
}
public static class LeafSetEntryModificationStrategy extends ValueNodeModificationStrategy<LeafListSchemaNode> {
package org.opendaylight.controller.sal.dom.broker.osgi;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.osgi.framework.ServiceReference;
return getDelegate().registerDataChangeListener(store, path, listener, triggeringScope);
}
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ return getDelegate().createTransactionChain(listener);
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * Simple implementation of {@link TransactionChainListener} for testing.
+ *
+ * This transaction chain listener does not contain any logic, only update
+ * futures ({@link #getFailFuture()} and {@link #getSuccessFuture()} when
+ * transaction chain event is retrieved.
+ *
+ */
+class BlockingTransactionChainListener implements TransactionChainListener {
+
+ private final SettableFuture<Throwable> failFuture = SettableFuture.create();
+ private final SettableFuture<Void> successFuture = SettableFuture.create();
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction,
+ Throwable cause) {
+ failFuture.set(cause);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ successFuture.set(null);
+ }
+
+ public SettableFuture<Throwable> getFailFuture() {
+ return failFuture;
+ }
+
+ public SettableFuture<Void> getSuccessFuture() {
+ return successFuture;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class DOMTransactionChainTest {
+
+ private SchemaContext schemaContext;
+ private DOMDataBrokerImpl domBroker;
+
+ @Before
+ public void setupStore() {
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ schemaContext = TestModel.createTestContext();
+
+ operStore.onGlobalContextUpdated(schemaContext);
+ configStore.onGlobalContextUpdated(schemaContext);
+
+ ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
+ .put(CONFIGURATION, configStore) //
+ .put(OPERATIONAL, operStore) //
+ .build();
+
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ domBroker = new DOMDataBrokerImpl(stores, executor);
+ }
+
+ @Test
+ public void testTransactionChainNoConflict() throws InterruptedException, ExecutionException, TimeoutException {
+ BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
+ DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+ assertNotNull(txChain);
+
+ /**
+ * We alocate new read-write transaction and write /test
+ *
+ *
+ */
+ DOMDataReadWriteTransaction firstTx = allocateAndWrite(txChain);
+
+ /**
+ * First transaction is marked as ready, we are able to allocate chained
+ * transactions
+ */
+ ListenableFuture<RpcResult<TransactionStatus>> firstWriteTxFuture = firstTx.commit();
+
+ /**
+ * We alocate chained transaction - read transaction.
+ */
+ DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+
+ /**
+ *
+ * We test if we are able to read data from tx, read should not fail
+ * since we are using chained transaction.
+ *
+ *
+ */
+ assertTestContainerExists(secondReadTx);
+
+ /**
+ *
+ * We alocate next transaction, which is still based on first one, but
+ * is read-write.
+ *
+ */
+ DOMDataReadWriteTransaction thirdDeleteTx = allocateAndDelete(txChain);
+
+ /**
+ * third transaction is sealed.
+ */
+ ListenableFuture<RpcResult<TransactionStatus>> thirdDeleteTxFuture = thirdDeleteTx.commit();
+
+ /**
+ * We commit first transaction
+ *
+ */
+ assertCommitSuccessful(firstWriteTxFuture);
+
+ // Alocates store transaction
+ DOMDataReadTransaction storeReadTx = domBroker.newReadOnlyTransaction();
+ /**
+ * We verify transaction is commited to store, container should exists
+ * in datastore.
+ */
+ assertTestContainerExists(storeReadTx);
+ /**
+ * We commit third transaction
+ *
+ */
+ assertCommitSuccessful(thirdDeleteTxFuture);
+
+ /**
+ * We close transaction chain.
+ */
+ txChain.close();
+
+ listener.getSuccessFuture().get(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testTransactionChainNotSealed() throws InterruptedException, ExecutionException, TimeoutException {
+ BlockingTransactionChainListener listener = new BlockingTransactionChainListener();
+ DOMTransactionChain txChain = domBroker.createTransactionChain(listener);
+ assertNotNull(txChain);
+
+ /**
+ * We alocate new read-write transaction and write /test
+ *
+ *
+ */
+ allocateAndWrite(txChain);
+
+ /**
+ * We alocate chained transaction - read transaction, note first one is
+ * still not commited to datastore, so this allocation should fail with
+ * IllegalStateException.
+ */
+ try {
+ DOMDataReadTransaction secondReadTx = txChain.newReadOnlyTransaction();
+ fail("Allocation of secondReadTx should fail with IllegalStateException");
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+ }
+
+ private static DOMDataReadWriteTransaction allocateAndDelete(DOMTransactionChain txChain)
+ throws InterruptedException, ExecutionException {
+ DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction();
+
+ /**
+ * We test existence of /test in third transaction container should
+ * still be visible from first one (which is still uncommmited).
+ *
+ */
+ assertTestContainerExists(tx);
+
+ /**
+ * We delete node in third transaction
+ */
+ tx.delete(LogicalDatastoreType.OPERATIONAL, TestModel.TEST_PATH);
+ return tx;
+ }
+
+ private static DOMDataReadWriteTransaction allocateAndWrite(DOMTransactionChain txChain)
+ throws InterruptedException, ExecutionException {
+ DOMDataReadWriteTransaction tx = txChain.newReadWriteTransaction();
+ assertTestContainerWrite(tx);
+ return tx;
+ }
+
+ private static void assertCommitSuccessful(ListenableFuture<RpcResult<TransactionStatus>> future)
+ throws InterruptedException, ExecutionException {
+ RpcResult<TransactionStatus> rpcResult = future.get();
+ assertTrue(rpcResult.isSuccessful());
+ assertEquals(TransactionStatus.COMMITED, rpcResult.getResult());
+ }
+
+ private static void assertTestContainerExists(DOMDataReadTransaction readTx) throws InterruptedException,
+ ExecutionException {
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> readFuture = readTx.read(OPERATIONAL, TestModel.TEST_PATH);
+ Optional<NormalizedNode<?, ?>> readedData = readFuture.get();
+ assertTrue(readedData.isPresent());
+ }
+
+ private static void assertTestContainerWrite(DOMDataReadWriteTransaction tx) throws InterruptedException,
+ ExecutionException {
+ tx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ assertTestContainerExists(tx);
+ }
+}
module basic-module {
- namespace "basic:module";
-
- prefix "basmod";
-
- import referenced-module {prefix refmo; revision-date 2013-12-2;}
-
- revision 2013-12-2 {
- }
-
- container cont {
- container cont1 {
- leaf lf11 {
- type identityref {
- base "refmo:iden";
- }
- }
- }
- leaf lfStr {
- type string;
- }
- leaf lfInt8 {
- type int8;
- }
-
- leaf lfInt16 {
- type int16;
- }
-
- leaf lfInt32 {
- type int32;
- }
-
- leaf lfInt64 {
- type int64;
- }
-
- leaf lfUint8 {
- type uint8;
- }
-
- leaf lfUint16 {
- type uint16;
- }
-
- leaf lfUint32 {
- type uint32;
- }
-
- leaf lfUint64 {
- type uint64;
- }
-
- leaf lfBinary {
- type binary;
- }
-
- leaf lfBits {
- type bits {
- bit one;
- bit two;
- bit three;
- }
- }
-
- leaf lfEnumeration {
- type enumeration {
- enum enum1;
- enum enum2;
- enum enum3;
- }
- }
-
- leaf lfEmpty {
- type empty;
- }
-
- leaf lfBoolean {
- type boolean;
- }
-
- leaf lfUnion {
- type union {
- type int8;
- type string;
- type bits {
- bit first;
- bit second;
- }
- type boolean;
- }
- }
-
- leaf lfLfref {
- type leafref {
- path "/cont/lfBoolean";
- }
- }
-
- leaf lfInIdentifier {
- type instance-identifier;
- }
-
- }
-
-}
\ No newline at end of file
+ namespace "basic:module";
+
+ prefix "basmod";
+
+ import referenced-module {prefix refmo; revision-date 2013-12-2;}
+
+ revision 2013-12-2 {
+ }
+
+ container cont {
+ container cont1 {
+ leaf lf11 {
+ type identityref {
+ base "refmo:iden";
+ }
+ }
+ }
+ leaf lfStr {
+ type string;
+ }
+ leaf lfInt8 {
+ type int8;
+ }
+
+ leaf lfInt16 {
+ type int16;
+ }
+
+ leaf lfInt32 {
+ type int32;
+ }
+
+ leaf lfInt64 {
+ type int64;
+ }
+
+ leaf lfUint8 {
+ type uint8;
+ }
+
+ leaf lfUint16 {
+ type uint16;
+ }
+
+ leaf lfUint32 {
+ type uint32;
+ }
+
+ leaf lfUint64 {
+ type uint64;
+ }
+
+ leaf lfBinary {
+ type binary;
+ }
+
+ leaf lfBits {
+ type bits {
+ bit one;
+ bit two;
+ bit three;
+ }
+ }
+
+ leaf lfEnumeration {
+ type enumeration {
+ enum enum1;
+ enum enum2;
+ enum enum3;
+ }
+ }
+
+ leaf lfEmpty {
+ type empty;
+ }
+
+ leaf lfBoolean {
+ type boolean;
+ }
+
+ leaf lfUnion {
+ type union {
+ type int8;
+ type string;
+ type bits {
+ bit first;
+ bit second;
+ }
+ type boolean;
+ }
+ }
+
+ leaf lfLfref {
+ type leafref {
+ path "/cont/lfBoolean";
+ }
+ }
+
+ leaf lfInIdentifier {
+ type instance-identifier;
+ }
+
+ }
+
+}