- Added new AsyncWriteTransaction#submit and deprecated AsyncWriteTransaction#commit.
- Modified unit tests and current code (in the toaster) in the
controller using commit.
Change-Id: I92317d01427bf442def8e7217ccb13313a0fd229
Signed-off-by: tpantelis <tpanteli@brocade.com>
import java.util.Collections;
import java.util.Map.Entry;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Identifiable;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
getDelegate().delete(store, normalized);
}
- protected final ListenableFuture<RpcResult<TransactionStatus>> doCommit() {
- return getDelegate().commit();
+ protected final CheckedFuture<Void,TransactionCommitFailedException> doSubmit() {
+ return getDelegate().submit();
}
protected final boolean doCancel() {
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
-
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
super(delegateTx, codec);
}
-
-
@Override
public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
doPut(store, path, data);
@Override
public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- return doCommit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+ return doSubmit();
}
@Override
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@Override
public ListenableFuture<RpcResult<TransactionStatus>> apply(final Boolean requestCommitSuccess) throws Exception {
if(requestCommitSuccess) {
- return tx.getDelegate().commit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(tx.getDelegate().submit());
}
return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>failed().withResult(TransactionStatus.FAILED).build());
}
"foo").build()).build();
initialTx.put(LogicalDatastoreType.OPERATIONAL, path(TOP_FOO_KEY),
topLevelList(TOP_FOO_KEY, fooAugment));
- assertCommit(initialTx.commit());
+ assertCommit(initialTx.submit());
}
private void delete(final InstanceIdentifier<?> path) {
WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.delete(LogicalDatastoreType.OPERATIONAL, path);
- assertCommit(tx.commit());
+ assertCommit(tx.submit());
}
private void verifyRemoved(
protected void setupWithDataBroker(final DataBroker dataBroker) {
WriteTransaction initialTx = dataBroker.newWriteOnlyTransaction();
initialTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_FOO_KEY)));
- assertCommit(initialTx.commit());
+ assertCommit(initialTx.submit());
}
@Test
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.put(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> top = topListener.event();
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> all = allListener.event();
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> foo = fooListener.event();
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.merge(CONFIGURATION, TOP, top(topLevelList(TOP_BAR_KEY)));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.put(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
ReadWriteTransaction writeTx = getDataBroker().newReadWriteTransaction();
writeTx.merge(CONFIGURATION, TOP_BAR, topLevelList(TOP_BAR_KEY));
- assertCommit(writeTx.commit());
+ assertCommit(writeTx.submit());
verifyBarOnlyAdded(topListener,allListener,fooListener,barListener);
}
*/
package org.opendaylight.controller.md.sal.binding.impl.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TopBuilder;
WriteTransaction writeTx = getDataBroker().newWriteOnlyTransaction();
writeTx.put(LogicalDatastoreType.OPERATIONAL, TOP_PATH, new TopBuilder().build());
writeTx.put(LogicalDatastoreType.OPERATIONAL, NODE_PATH, new TopLevelListBuilder().setKey(TOP_LIST_KEY).build());
- assertEquals(TransactionStatus.COMMITED, writeTx.commit().get().getResult());
+ writeTx.submit().get();
}
}
*/
package org.opendaylight.controller.md.sal.binding.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import com.google.common.util.concurrent.ListenableFuture;
return domBroker;
}
- protected static final void assertCommit(final ListenableFuture<RpcResult<TransactionStatus>> commit) {
+ protected static final void assertCommit(final ListenableFuture<Void> commit) {
try {
- assertEquals(TransactionStatus.COMMITED,commit.get(500, TimeUnit.MILLISECONDS).getResult());
+ commit.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
/**
* <tt>true</tt> otherwise
*
*/
- public boolean cancel();
+ boolean cancel();
/**
* Store a piece of data at specified path. This acts as an add / replace
* @throws IllegalStateException
* if the transaction is no longer {@link TransactionStatus#NEW}
*/
- public void put(LogicalDatastoreType store, P path, D data);
+ void put(LogicalDatastoreType store, P path, D data);
/**
* Store a piece of data at the specified path. This acts as a merge operation,
* @throws IllegalStateException
* if the transaction is no longer {@link TransactionStatus#NEW}
*/
- public void merge(LogicalDatastoreType store, P path, D data);
+ void merge(LogicalDatastoreType store, P path, D data);
/**
* Remove a piece of data from specified path. This operation does not fail
* @throws IllegalStateException
* if the transaction is no longer {@link TransactionStatus#NEW}
*/
- public void delete(LogicalDatastoreType store, P path);
+ void delete(LogicalDatastoreType store, P path);
/**
- * Submits transaction to be applied to update logical data tree.
+ * Submits this transaction to be asynchronously applied to update the logical data tree.
+ * The returned CheckedFuture conveys the result of applying the data changes.
+ * <p>
+ * <b>Note:</b> It is strongly recommended to process the CheckedFuture result in an asynchronous
+ * manner rather than using the blocking get() method. See example usage below.
* <p>
* This call logically seals the transaction, which prevents the client from
* further changing data tree using this transaction. Any subsequent calls to
* {@link IllegalStateException}.
*
* The transaction is marked as {@link TransactionStatus#SUBMITED} and
- * enqueued into the data store backed for processing.
+ * enqueued into the data store back-end for processing.
*
* <p>
* Whether or not the commit is successful is determined by versioning
- * of data tree and validation of registered commit participants
- * {@link AsyncConfigurationCommitHandler}
- * if transaction changes {@link LogicalDatastoreType#CONFIGURATION} data tree.
- *<p>
- * The effects of successful commit of data depends on
- * other data change listeners {@link AsyncDataChangeListener} and
- * {@link AsyncConfigurationCommitHandler}, which was registered to the
- * same {@link AsyncDataBroker}, to which this transaction belongs.
- *
+ * of the data tree and validation of registered commit participants
+ * ({@link AsyncConfigurationCommitHandler})
+ * if the transaction changes the data tree.
+ * <p>
+ * The effects of a successful commit of data depends on data change listeners
+ * ({@link AsyncDataChangeListener}) and commit participants
+ * ({@link AsyncConfigurationCommitHandler}) that are registered with the data broker.
+ * <p>
+ * <h3>Example usage:</h3>
+ * <pre>
+ * private void doWrite( final int tries ) {
+ * WriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
+ *
+ * MyDataObject data = ...;
+ * InstanceIdentifier<MyDataObject> path = ...;
+ * writeTx.put( LogicalDatastoreType.OPERATIONAL, path, data );
+ *
+ * Futures.addCallback( writeTx.commit(), new FutureCallback<Void>() {
+ * public void onSuccess( Void result ) {
+ * // succeeded
+ * }
+ *
+ * public void onFailure( Throwable t ) {
+ * if( t instanceof OptimisticLockFailedException ) {
+ * if( ( tries - 1 ) > 0 ) {
+ * // do retry
+ * doWrite( tries - 1 );
+ * } else {
+ * // out of retries
+ * }
+ * } else {
+ * // failed due to another type of TransactionCommitFailedException.
+ * }
+ * } );
+ * }
+ * ...
+ * doWrite( 2 );
+ * </pre>
* <h2>Failure scenarios</h2>
* <p>
* Transaction may fail because of multiple reasons, such as
* <ul>
- * <li>Another transaction finished earlier and modified the same node in
- * non-compatible way (see below). In this case the returned future will fail with
+ * <li>Another transaction finished earlier and modified the same node in a
+ * non-compatible way (see below). In this case the returned future will fail with an
* {@link OptimisticLockFailedException}. It is the responsibility of the
* caller to create a new transaction and submit the same modification again in
- * order to update data tree.</li>
+ * order to update data tree. <i><b>Warning</b>: In most cases, retrying after an
+ * OptimisticLockFailedException will result in a high probability of success.
+ * However, there are scenarios, albeit unusual, where any number of retries will
+ * not succeed. Therefore it is strongly recommended to limit the number of retries (2 or 3)
+ * to avoid an endless loop.</i>
+ * </li>
* <li>Data change introduced by this transaction did not pass validation by
* commit handlers or data was incorrectly structured. Returned future will
- * fail with {@link DataValidationFailedException}. User should not retry to
+ * fail with a {@link DataValidationFailedException}. User should not retry to
* create new transaction with same data, since it probably will fail again.
* </li>
* </ul>
* with {@link OptimisticLockFailedException} exception, which indicates to
* client that concurrent transaction prevented the submitted transaction from being
* applied.
- *
- * @return Result of the Commit, containing success information or list of
- * encountered errors, if commit was not successful. The Future
- * blocks until {@link TransactionStatus#COMMITED} is reached.
- * Future will fail with {@link TransactionCommitFailedException} if
- * Commit of this transaction failed. TODO: Usability: Consider
- * change from ListenableFuture to
- * {@link com.google.common.util.concurrent.CheckedFuture} which
- * will throw {@link TransactionCommitFailedException}.
+ * <br>
+ * @return a CheckFuture containing the result of the commit. The Future blocks until the
+ * commit operation is complete. A successful commit returns nothing. On failure,
+ * the Future will fail with a {@link TransactionCommitFailedException} or an exception
+ * derived from TransactionCommitFailedException.
*
* @throws IllegalStateException
* if the transaction is not {@link TransactionStatus#NEW}
*/
- public ListenableFuture<RpcResult<TransactionStatus>> commit();
+ CheckedFuture<Void,TransactionCommitFailedException> submit();
+
+ /**
+ * @deprecated Use {@link #submit()} instead.
+ */
+ @Deprecated
+ ListenableFuture<RpcResult<TransactionStatus>> commit();
}
package org.opendaylight.controller.md.sal.common.api.data;
import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import com.google.common.base.Preconditions;
private Class<? extends Path<?>> pathType;
- public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path, final String message, final Throwable cause) {
- super(message, cause);
+ public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+ final String message, final Throwable cause) {
+ super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "invalid-value", message, null,
+ path != null ? path.toString() : null, cause));
this.pathType = Preconditions.checkNotNull(pathType, "path type must not be null");
this.path = Preconditions.checkNotNull(path,"path must not be null.");
}
- public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,final String message) {
- this(pathType,path,message,null);
+ public <P extends Path<P>> DataValidationFailedException(final Class<P> pathType,final P path,
+ final String message) {
+ this(pathType, path, message, null);
}
public final Path<?> getPath() {
package org.opendaylight.controller.md.sal.common.api.data;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
/**
*
* Failure of asynchronous transaction commit caused by failure
private static final long serialVersionUID = 1L;
- protected OptimisticLockFailedException(final String message, final Throwable cause, final boolean enableSuppression,
- final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
public OptimisticLockFailedException(final String message, final Throwable cause) {
- super(message, cause);
+ super(message, cause, RpcResultBuilder.newError(ErrorType.APPLICATION, "resource-denied",
+ message, null, null, cause));
}
public OptimisticLockFailedException(final String message) {
- super(message);
+ this(message, null);
}
}
*/
package org.opendaylight.controller.md.sal.common.api.data;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
+import com.google.common.collect.ImmutableList;
+
/**
*
* Failed commit of asynchronous transaction
*/
public class TransactionCommitFailedException extends Exception {
- private static final long serialVersionUID = -6138306275373237068L;
+ private static final long serialVersionUID = 1L;
- protected TransactionCommitFailedException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
+ private final List<RpcError> errorList;
+
+ public TransactionCommitFailedException(final String message, final RpcError... errors) {
+ this(message, null, errors);
}
- public TransactionCommitFailedException(final String message, final Throwable cause) {
+ public TransactionCommitFailedException(final String message, final Throwable cause,
+ final RpcError... errors) {
super(message, cause);
+
+ if( errors != null && errors.length > 0 ) {
+ errorList = ImmutableList.<RpcError>builder().addAll( Arrays.asList( errors ) ).build();
+ }
+ else {
+ // Add a default RpcError.
+ errorList = ImmutableList.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null,
+ getMessage(), null, null, getCause()));
+ }
}
- public TransactionCommitFailedException(final String message) {
- super(message);
+ /**
+ * Returns additional error information about this exception.
+ *
+ * @return a List of RpcErrors. There is always at least one RpcError.
+ */
+ public List<RpcError> getErrorList() {
+ return errorList;
}
+ @Override
+ public String getMessage() {
+ return new StringBuilder( super.getMessage() ).append(", errors: ").append( errorList ).toString();
+ }
}
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
AbstractDataModification<P, D> {
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
AbstractDataTransaction<?, ?> other = (AbstractDataTransaction<?, ?>) obj;
if (identifier == null) {
- if (other.identifier != null)
+ if (other.identifier != null) {
return false;
- } else if (!identifier.equals(other.identifier))
+ }
+ } else if (!identifier.equals(other.identifier)) {
return false;
+ }
return true;
}
this.status = status;
this.onStatusChange(status);
}
+
+ public static ListenableFuture<RpcResult<TransactionStatus>> convertToLegacyCommitFuture(
+ CheckedFuture<Void,TransactionCommitFailedException> from ) {
+ return Futures.transform(from, new AsyncFunction<Void, RpcResult<TransactionStatus>>() {
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> apply(Void input) throws Exception {
+ return Futures.immediateFuture(RpcResultBuilder.<TransactionStatus>
+ success(TransactionStatus.COMMITED).build());
+ }
+ } );
+ }
}
* <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
* {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
* and then invoking finalized implementation callback
- * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
* was commited and gathered results.
* </ul>
*
* <li> {@link DOMDataWriteTransaction#commit()} - results in invoking
* {@link DOMStoreWriteTransaction#ready()}, gathering all resulting cohorts
* and then invoking finalized implementation callback
- * {@link #commit(DOMDataWriteTransaction, Iterable)} with transaction which
+ * {@link #submit(DOMDataWriteTransaction, Iterable)} with transaction which
* was commited and gathered results.
* <li>
* </ul>
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DOMStore> implements DOMDataBroker,
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* NormalizedNode implementation of {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChain} which is backed
}
@Override
- public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit(
+ public synchronized CheckedFuture<Void,TransactionCommitFailedException> submit(
final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
}
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> submit(final DOMDataWriteTransaction transaction,
+ public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = executor.submit(new CommitCoordinationTask(
+ ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
transaction, cohorts, listener));
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
- return commitFuture;
+
+ return Futures.makeChecked(commitFuture, TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
/**
* support of cancelation.
*
*/
- private static class CommitCoordinationTask implements Callable<RpcResult<TransactionStatus>> {
+ private static class CommitCoordinationTask implements Callable<Void> {
private final DOMDataWriteTransaction tx;
private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
}
@Override
- public RpcResult<TransactionStatus> call() throws TransactionCommitFailedException {
+ public Void call() throws TransactionCommitFailedException {
try {
canCommitBlocking();
preCommitBlocking();
- return commitBlocking();
+ commitBlocking();
+ return null;
} catch (TransactionCommitFailedException e) {
LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), currentPhase, e);
abortBlocking(e);
* If one of cohorts failed preCommit
*
*/
- private RpcResult<TransactionStatus> commitBlocking() throws TransactionCommitFailedException {
+ private void commitBlocking() throws TransactionCommitFailedException {
commitAll().checkedGet();
- return RpcResultBuilder.<TransactionStatus>success(TransactionStatus.COMMITED).build();
}
/**
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
* callback is invoked with associated transaction and throwable is invoked on listener.
*
*/
-class DOMDataCommitErrorInvoker implements FutureCallback<RpcResult<TransactionStatus>> {
+class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
private final DOMDataWriteTransaction tx;
private final DOMDataCommitErrorListener listener;
}
@Override
- public void onSuccess(RpcResult<TransactionStatus> result) {
+ public void onSuccess(Void result) {
// NOOP
}
}
\ No newline at end of file
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
* Executor of Three Phase Commit coordination for
* subtransactoins.
* @param listener
* Error listener which should be notified if transaction failed.
- * @return ListenableFuture which contains RpcResult with
- * {@link TransactionStatus#COMMITED} if commit coordination on
- * cohorts finished successfully.
+ * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
+ * nothing is returned from the Future, On failure,
+ * the Future fails with a {@link TransactionCommitFailedException}.
*
*/
- ListenableFuture<RpcResult<TransactionStatus>> submit(DOMDataWriteTransaction tx,
+ CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
}
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
/**
*
public interface DOMDataCommitImplementation {
/**
- * User-supplied implementation of {@link DOMDataWriteTransaction#commit()}
+ * User-supplied implementation of {@link DOMDataWriteTransaction#submit()}
* for transaction.
*
- * Callback invoked when {@link DOMDataWriteTransaction#commit()} is invoked
+ * Callback invoked when {@link DOMDataWriteTransaction#submit()} is invoked
* on transaction created by this factory.
*
* @param transaction
* commited transaction.
*
*/
- ListenableFuture<RpcResult<TransactionStatus>> commit(final DOMDataWriteTransaction transaction,
+ CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
final Iterable<DOMStoreThreePhaseCommitCohort> cohorts);
}
* <li>{@link #merge(LogicalDatastoreType, InstanceIdentifier, NormalizedNode)}
* </ul>
* {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
* invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
* transactions.
*
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
/**
* </ul>
* <p>
* {@link #commit()} will result in invocation of
- * {@link DOMDataCommitImplementation#commit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
+ * {@link DOMDataCommitImplementation#submit(org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction, Iterable)}
* invocation with all {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort} for underlying
* transactions.
*
*
*/
@GuardedBy("this")
- private volatile ListenableFuture<RpcResult<TransactionStatus>> commitFuture;
+ private volatile CheckedFuture<Void, TransactionCommitFailedException> commitFuture;
protected DOMForwardedWriteTransaction(final Object identifier,
final ImmutableMap<LogicalDatastoreType, T> backingTxs, final DOMDataCommitImplementation commitImpl) {
@Override
public synchronized ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
checkNotReady();
ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
cohortsBuilder.add(subTx.ready());
}
ImmutableList<DOMStoreThreePhaseCommitCohort> cohorts = cohortsBuilder.build();
- commitFuture = commitImpl.commit(this, cohorts);
+ commitFuture = commitImpl.submit(this, cohorts);
/*
*We remove reference to Commit Implementation in order
private void checkNotCommited() {
checkState(commitFuture == null, "Transaction was already submited.");
}
-
}
\ No newline at end of file
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
public Future<RpcResult<TransactionStatus>> commit() {
Preconditions.checkState(status == TransactionStatus.NEW);
status = TransactionStatus.SUBMITED;
- return getDelegate().commit();
+ return AbstractDataTransaction.convertToLegacyCommitFuture(getDelegate().submit());
}
@Override
public List<ListenableFuture<?>> call() throws Exception {
List<ListenableFuture<?>> builder = new ArrayList<>(txNum);
for (DOMDataReadWriteTransaction tx :transactions) {
- builder.add(tx.commit());
+ builder.add(tx.submit());
}
return builder;
}
measure("Txs:1 Submit", new Callable<ListenableFuture<?>>() {
@Override
public ListenableFuture<?> call() throws Exception {
- return writeTx.commit();
+ return writeTx.submit();
}
}).get();
return null;
TestModel.TEST_PATH);
assertTrue(writeTxContainer.get().isPresent());
- writeTx.commit().get();
+ writeTx.submit().get();
Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
.read(OPERATIONAL, TestModel.TEST_PATH).get();
*/
package org.opendaylight.controller.md.sal.dom.broker.impl;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* First transaction is marked as ready, we are able to allocate chained
* transactions
*/
- ListenableFuture<RpcResult<TransactionStatus>> firstWriteTxFuture = firstTx.commit();
+ ListenableFuture<Void> firstWriteTxFuture = firstTx.submit();
/**
* We alocate chained transaction - read transaction.
/**
* third transaction is sealed and commited
*/
- ListenableFuture<RpcResult<TransactionStatus>> thirdDeleteTxFuture = thirdDeleteTx.commit();
+ ListenableFuture<Void> thirdDeleteTxFuture = thirdDeleteTx.submit();
assertCommitSuccessful(thirdDeleteTxFuture);
/**
return tx;
}
- private static void assertCommitSuccessful(final ListenableFuture<RpcResult<TransactionStatus>> future)
+ private static void assertCommitSuccessful(final ListenableFuture<Void> future)
throws InterruptedException, ExecutionException {
- RpcResult<TransactionStatus> rpcResult = future.get();
- assertTrue(rpcResult.isSuccessful());
- assertEquals(TransactionStatus.COMMITED, rpcResult.getResult());
+ future.get();
}
private static void assertTestContainerExists(final DOMDataReadTransaction readTx) throws InterruptedException,
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.DisplayString;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.toaster.rev091120.MakeToastInput;
executor.shutdown();
if (dataProvider != null) {
- WriteTransaction t = dataProvider.newWriteOnlyTransaction();
- t.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
- ListenableFuture<RpcResult<TransactionStatus>> future = t.commit();
- Futures.addCallback( future, new FutureCallback<RpcResult<TransactionStatus>>() {
+ WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL,TOASTER_IID);
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( final RpcResult<TransactionStatus> result ) {
+ public void onSuccess( final Void result ) {
LOG.debug( "Delete Toaster commit result: " + result );
}
final SettableFuture<RpcResult<Void>> futureResult = SettableFuture.create();
- checkStatusAndMakeToast( input, futureResult );
+ checkStatusAndMakeToast( input, futureResult, 2 );
return futureResult;
}
}
private void checkStatusAndMakeToast( final MakeToastInput input,
- final SettableFuture<RpcResult<Void>> futureResult ) {
+ final SettableFuture<RpcResult<Void>> futureResult,
+ final int tries ) {
// Read the ToasterStatus and, if currently Up, try to write the status to Down.
// If that succeeds, then we essentially have an exclusive lock and can proceed
ListenableFuture<Optional<Toaster>> readFuture =
tx.read( LogicalDatastoreType.OPERATIONAL, TOASTER_IID );
- final ListenableFuture<RpcResult<TransactionStatus>> commitFuture =
- Futures.transform( readFuture, new AsyncFunction<Optional<Toaster>,
- RpcResult<TransactionStatus>>() {
+ final ListenableFuture<Void> commitFuture =
+ Futures.transform( readFuture, new AsyncFunction<Optional<Toaster>,Void>() {
@Override
- public ListenableFuture<RpcResult<TransactionStatus>> apply(
+ public ListenableFuture<Void> apply(
final Optional<Toaster> toasterData ) throws Exception {
ToasterStatus toasterStatus = ToasterStatus.Up;
if( outOfBread() ) {
LOG.debug( "Toaster is out of bread" );
- return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
- .withRpcError( makeToasterOutOfBreadError() ).build() );
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException( "", makeToasterOutOfBreadError() ) );
}
LOG.debug( "Setting Toaster status to Down" );
// concurrent toasting.
tx.put( LogicalDatastoreType.OPERATIONAL, TOASTER_IID,
buildToaster( ToasterStatus.Down ) );
- return tx.commit();
+ return tx.submit();
}
LOG.debug( "Oops - already making toast!" );
// Return an error since we are already making toast. This will get
// propagated to the commitFuture below which will interpret the null
// TransactionStatus in the RpcResult as an error condition.
- return Futures.immediateFuture( RpcResultBuilder.<TransactionStatus>failed()
- .withRpcError( makeToasterInUseError() ).build() );
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException( "", makeToasterInUseError() ) );
}
} );
- Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback( commitFuture, new FutureCallback<Void>() {
@Override
- public void onSuccess( final RpcResult<TransactionStatus> result ) {
- if( result.getResult() == TransactionStatus.COMMITED ) {
-
- // OK to make toast
- currentMakeToastTask.set( executor.submit(
- new MakeToastTask( input, futureResult ) ) );
- } else {
-
- LOG.debug( "Setting error result" );
-
- // Either the transaction failed to commit for some reason or, more likely,
- // the read above returned ToasterStatus.Down. Either way, fail the
- // futureResult and copy the errors.
-
- futureResult.set( RpcResultBuilder.<Void>failed().withRpcErrors(
- result.getErrors() ).build() );
- }
+ public void onSuccess( final Void result ) {
+ // OK to make toast
+ currentMakeToastTask.set( executor.submit( new MakeToastTask( input, futureResult ) ) );
}
@Override
// status before us. Try reading the status again - if another make toast is
// now in progress, we should get ToasterStatus.Down and fail.
- LOG.debug( "Got OptimisticLockFailedException - trying again" );
+ if( ( tries - 1 ) > 0 ) {
+ LOG.debug( "Got OptimisticLockFailedException - trying again" );
- checkStatusAndMakeToast( input, futureResult );
+ checkStatusAndMakeToast( input, futureResult, tries - 1 );
+ }
+ else {
+ futureResult.set( RpcResultBuilder.<Void> failed()
+ .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+ }
} else {
- LOG.error( "Failed to commit Toaster status", ex );
+ LOG.debug( "Failed to commit Toaster status", ex );
- // Got some unexpected error so fail.
+ // Probably already making toast.
futureResult.set( RpcResultBuilder.<Void> failed()
- .withError( ErrorType.APPLICATION, ex.getMessage() ).build() );
+ .withRpcErrors( ((TransactionCommitFailedException)ex).getErrorList() )
+ .build() );
}
}
} );
WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
tx.put( LogicalDatastoreType.OPERATIONAL,TOASTER_IID, buildToaster( ToasterStatus.Up ) );
- ListenableFuture<RpcResult<TransactionStatus>> commitFuture = tx.commit();
-
- Futures.addCallback( commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>() {
@Override
- public void onSuccess( final RpcResult<TransactionStatus> result ) {
- if( result.getResult() != TransactionStatus.COMMITED ) {
- LOG.error( "Failed to update toaster status: " + result.getErrors() );
- }
-
- notifyCallback( result.getResult() == TransactionStatus.COMMITED );
+ public void onSuccess( final Void result ) {
+ notifyCallback( true );
}
@Override