From: Tom Pantelis Date: Sun, 29 Apr 2018 22:51:36 +0000 (-0400) Subject: Implement AsyncWriteTransaction.commit() X-Git-Tag: release/fluorine~82^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=16d0b397db62207e42648e6298b7c826a4858b77;p=netconf.git Implement AsyncWriteTransaction.commit() Implement the new commit() method which uses FluentFuture instead of the deprecated CheckedFuture. The commit() method is defaulted in the interface but, as soon as all implementations implement commit(), the deprecated submit() method will be defaulted instead. Change-Id: If053be0d3864513685a231a320a7f47931f76684 Signed-off-by: Tom Pantelis --- diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java index e0075476e7..5cd2418312 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java @@ -13,11 +13,17 @@ import akka.actor.ActorSystem; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -69,7 +75,18 @@ public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction { @Override public CheckedFuture submit() { - return delegateWrite.submit(getIdentifier()); + return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()), + new ExceptionMapper("commit", TransactionCommitFailedException.class) { + @Override + protected TransactionCommitFailedException newWithCause(String message, Throwable cause) { + return new TransactionCommitFailedException(message, cause); + } + }); + } + + @Override + public @NonNull FluentFuture commit() { + return delegateWrite.commit(getIdentifier()); } @Override diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java index 703cfce42c..ccd58fb9e2 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java @@ -13,15 +13,14 @@ import akka.actor.ActorSystem; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; -import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; @@ -74,46 +73,46 @@ public class ProxyWriteAdapter { } } - public CheckedFuture submit(final Object identifier) { + public @NonNull FluentFuture commit(final Object identifier) { if (!opened.compareAndSet(true, false)) { throw new IllegalStateException(id + ": Transaction" + identifier + " is closed"); } final Future submitScalaFuture = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout); - LOG.trace("{}: Submit {} via NETCONF", id); + LOG.trace("{}: Commit {} via NETCONF", id); - final SettableFuture settableFuture = SettableFuture.create(); + final SettableFuture settableFuture = SettableFuture.create(); submitScalaFuture.onComplete(new OnComplete() { @Override public void onComplete(final Throwable failure, final Object success) throws Throwable { if (failure != null) { // ask timeout - final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id); - settableFuture.setException(exception); + settableFuture.setException(newTransactionCommitFailedException( + NetconfTopologyUtils.createMasterIsDownException(id), identifier)); return; } if (success instanceof Throwable) { - settableFuture.setException((Throwable) success); + settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier)); } else { if (success instanceof SubmitFailedReply) { LOG.error("{}: Transaction was not submitted because already closed.", id); - settableFuture.setException(((SubmitFailedReply) success).getThrowable()); + settableFuture.setException(newTransactionCommitFailedException( + ((SubmitFailedReply) success).getThrowable(), identifier)); return; } - settableFuture.set(null); + settableFuture.set(CommitInfo.empty()); } } }, actorSystem.dispatcher()); - return Futures.makeChecked(settableFuture, new Function() { - @Nullable - @Override - public TransactionCommitFailedException apply(@Nullable final Exception input) { - final String message = "Submit of transaction " + identifier + " failed"; - return new TransactionCommitFailedException(message, input); - } - }); + return FluentFuture.from(settableFuture); + } + + private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure, + final Object identifier) { + return new TransactionCommitFailedException( + String.format("Commit of transaction %s failed", identifier), failure); } public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) { diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java index f2ed8b7e04..d2840235e9 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java @@ -12,10 +12,16 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.util.Timeout; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -48,7 +54,18 @@ public class ProxyWriteTransaction implements DOMDataWriteTransaction { @Override public CheckedFuture submit() { - return proxyWriteAdapter.submit(getIdentifier()); + return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()), + new ExceptionMapper("commit", TransactionCommitFailedException.class) { + @Override + protected TransactionCommitFailedException newWithCause(String message, Throwable cause) { + return new TransactionCommitFailedException(message, cause); + } + }); + } + + @Override + public @NonNull FluentFuture commit() { + return proxyWriteAdapter.commit(getIdentifier()); } @Override diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java index cc0d037e00..60bf213f88 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java @@ -11,22 +11,29 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import javax.annotation.Nonnull; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.netconf.api.DocumentedException; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.yangtools.util.concurrent.ExceptionMapper; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -146,6 +153,44 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction { editStructure, Optional.of(ModifyAction.NONE), "delete"); } + @Override + public CheckedFuture submit() { + return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()), + new ExceptionMapper("commit", TransactionCommitFailedException.class) { + @Override + protected TransactionCommitFailedException newWithCause(String message, Throwable cause) { + return new TransactionCommitFailedException(message, cause); + } + }); + } + + @Override + public @NonNull FluentFuture commit() { + final SettableFuture resultFuture = SettableFuture.create(); + Futures.addCallback(commitConfiguration(), new FutureCallback>() { + @Override + public void onSuccess(RpcResult result) { + if (!result.isSuccessful()) { + final Collection errors = result.getErrors(); + resultFuture.setException(new TransactionCommitFailedException( + String.format("Commit of transaction %s failed", getIdentifier()), + errors.toArray(new RpcError[errors.size()]))); + return; + } + + resultFuture.set(CommitInfo.empty()); + } + + @Override + public void onFailure(Throwable failure) { + resultFuture.setException(new TransactionCommitFailedException( + String.format("Commit of transaction %s failed", getIdentifier()), failure)); + } + }, MoreExecutors.directExecutor()); + + return FluentFuture.from(resultFuture); + } + protected final ListenableFuture> commitConfiguration() { listeners.forEach(listener -> listener.onTransactionSubmitted(this)); checkNotFinished(); diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadWriteTx.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadWriteTx.java index 1c44b40f84..02803c6a7a 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadWriteTx.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadWriteTx.java @@ -10,12 +10,15 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -56,6 +59,11 @@ public class ReadWriteTx implements DOMDataReadWriteTransaction { return delegateWriteTx.submit(); } + @Override + public @NonNull FluentFuture commit() { + return delegateWriteTx.commit(); + } + @Override public CheckedFuture>, ReadFailedException> read( final LogicalDatastoreType store, final YangInstanceIdentifier path) { diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java index ac24ac8f35..23652e767b 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java @@ -9,15 +9,12 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback; @@ -93,18 +90,6 @@ public class WriteCandidateTx extends AbstractWriteTx { cleanupOnSuccess(); } - @Override - public synchronized CheckedFuture submit() { - final ListenableFuture commitFutureAsVoid = Futures.transform(commitConfiguration(), input -> { - Preconditions.checkArgument(input.isSuccessful() && input.getErrors().isEmpty(), - "Submit failed with errors: %s", input.getErrors()); - return null; - }, MoreExecutors.directExecutor()); - - return Futures.makeChecked(commitFutureAsVoid, input -> new TransactionCommitFailedException( - "Submit of transaction " + getIdentifier() + " failed", input)); - } - /** * This has to be non blocking since it is called from a callback on commit * and its netty threadpool that is really sensitive to blocking calls. diff --git a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java index 6b1999f74e..f843340b25 100644 --- a/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java +++ b/netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java @@ -8,15 +8,10 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx; -import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback; @@ -70,16 +65,6 @@ public class WriteRunningTx extends AbstractWriteTx { unlock(); } - @Override - public synchronized CheckedFuture submit() { - final ListenableFuture commmitFutureAsVoid = Futures.transform(commitConfiguration(), - (Function, Void>) input -> null, MoreExecutors.directExecutor()); - - return Futures.makeChecked(commmitFutureAsVoid, - input -> new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed", - input)); - } - @Override public synchronized ListenableFuture> performCommit() { for (final Change change : changes) {