Implement AsyncWriteTransaction.commit() 45/71545/3
authorTom Pantelis <tompantelis@gmail.com>
Sun, 29 Apr 2018 22:51:36 +0000 (18:51 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 30 Apr 2018 00:28:07 +0000 (20:28 -0400)
Implement the new commit() method which uses FluentFuture
instead of the deprecated CheckedFuture. The commit() method
is defaulted in the interface but, as soon as all implementations
implement commit(), the deprecated submit() method will be defaulted
instead.

Change-Id: If053be0d3864513685a231a320a7f47931f76684
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyReadWriteTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteAdapter.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/ProxyWriteTransaction.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/AbstractWriteTx.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/ReadWriteTx.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteCandidateTx.java
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/tx/WriteRunningTx.java

index e0075476e7769db1640c583134e528810fb2c88d..5cd241831252d965b25837ed1681b37ba9e8d8f8 100644 (file)
@@ -13,11 +13,17 @@ import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -69,7 +75,18 @@ public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
 
     @Override
     public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        return delegateWrite.submit(getIdentifier());
+        return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+            new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+                @Override
+                protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+                    return new TransactionCommitFailedException(message, cause);
+                }
+            });
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        return delegateWrite.commit(getIdentifier());
     }
 
     @Override
index 703cfce42ccdbbae5392d1df6812b98524ebdfaf..ccd58fb9e2a6c8510f617ef6d63d10fd18ae33e0 100644 (file)
@@ -13,15 +13,14 @@ import akka.actor.ActorSystem;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
@@ -74,46 +73,46 @@ public class ProxyWriteAdapter {
         }
     }
 
-    public CheckedFuture<Void, TransactionCommitFailedException> submit(final Object identifier) {
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit(final Object identifier) {
         if (!opened.compareAndSet(true, false)) {
             throw new IllegalStateException(id + ": Transaction" + identifier + " is closed");
         }
         final Future<Object> submitScalaFuture =
                 Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
 
-        LOG.trace("{}: Submit {} via NETCONF", id);
+        LOG.trace("{}: Commit {} via NETCONF", id);
 
-        final SettableFuture<Void> settableFuture = SettableFuture.create();
+        final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
         submitScalaFuture.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(final Throwable failure, final Object success) throws Throwable {
                 if (failure != null) { // ask timeout
-                    final Exception exception = NetconfTopologyUtils.createMasterIsDownException(id);
-                    settableFuture.setException(exception);
+                    settableFuture.setException(newTransactionCommitFailedException(
+                            NetconfTopologyUtils.createMasterIsDownException(id), identifier));
                     return;
                 }
                 if (success instanceof Throwable) {
-                    settableFuture.setException((Throwable) success);
+                    settableFuture.setException(newTransactionCommitFailedException((Throwable) success, identifier));
                 } else {
                     if (success instanceof SubmitFailedReply) {
                         LOG.error("{}: Transaction was not submitted because already closed.", id);
-                        settableFuture.setException(((SubmitFailedReply) success).getThrowable());
+                        settableFuture.setException(newTransactionCommitFailedException(
+                                ((SubmitFailedReply) success).getThrowable(), identifier));
                         return;
                     }
 
-                    settableFuture.set(null);
+                    settableFuture.set(CommitInfo.empty());
                 }
             }
         }, actorSystem.dispatcher());
 
-        return Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
-            @Nullable
-            @Override
-            public TransactionCommitFailedException apply(@Nullable final Exception input) {
-                final String message = "Submit of transaction " + identifier + " failed";
-                return new TransactionCommitFailedException(message, input);
-            }
-        });
+        return FluentFuture.from(settableFuture);
+    }
+
+    private static TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure,
+            final Object identifier) {
+        return new TransactionCommitFailedException(
+                String.format("Commit of transaction %s failed", identifier), failure);
     }
 
     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier identifier) {
index f2ed8b7e04a141a1014f0a0ee87aaa658158daff..d2840235e9284c9182226a10739d13439570a079 100644 (file)
@@ -12,10 +12,16 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -48,7 +54,18 @@ public class ProxyWriteTransaction implements DOMDataWriteTransaction {
 
     @Override
     public CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        return proxyWriteAdapter.submit(getIdentifier());
+        return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+            new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+                @Override
+                protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+                    return new TransactionCommitFailedException(message, cause);
+                }
+            });
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        return proxyWriteAdapter.commit(getIdentifier());
     }
 
     @Override
index cc0d037e00e694ec1b05e7b7786b83c03285a095..60bf213f88880b4575aa80e4d6141d4005a6e15f 100644 (file)
@@ -11,22 +11,29 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
 import org.opendaylight.netconf.api.DocumentedException;
 import org.opendaylight.netconf.api.NetconfDocumentedException;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -146,6 +153,44 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
                 editStructure, Optional.of(ModifyAction.NONE), "delete");
     }
 
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        return MappingCheckedFuture.create(commit().transform(ignored -> null, MoreExecutors.directExecutor()),
+            new ExceptionMapper<TransactionCommitFailedException>("commit", TransactionCommitFailedException.class) {
+                @Override
+                protected TransactionCommitFailedException newWithCause(String message, Throwable cause) {
+                    return new TransactionCommitFailedException(message, cause);
+                }
+            });
+    }
+
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        final SettableFuture<CommitInfo> resultFuture = SettableFuture.create();
+        Futures.addCallback(commitConfiguration(), new FutureCallback<RpcResult<Void>>() {
+            @Override
+            public void onSuccess(RpcResult<Void> result) {
+                if (!result.isSuccessful()) {
+                    final Collection<RpcError> errors = result.getErrors();
+                    resultFuture.setException(new TransactionCommitFailedException(
+                        String.format("Commit of transaction %s failed", getIdentifier()),
+                            errors.toArray(new RpcError[errors.size()])));
+                    return;
+                }
+
+                resultFuture.set(CommitInfo.empty());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                resultFuture.setException(new TransactionCommitFailedException(
+                        String.format("Commit of transaction %s failed", getIdentifier()), failure));
+            }
+        }, MoreExecutors.directExecutor());
+
+        return FluentFuture.from(resultFuture);
+    }
+
     protected final ListenableFuture<RpcResult<Void>> commitConfiguration() {
         listeners.forEach(listener -> listener.onTransactionSubmitted(this));
         checkNotFinished();
index 1c44b40f841712bce44169e702ab7cdc5330a656..02803c6a7ae95ea7739f93855138222b4f11c796 100644 (file)
@@ -10,12 +10,15 @@ package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -56,6 +59,11 @@ public class ReadWriteTx implements DOMDataReadWriteTransaction {
         return delegateWriteTx.submit();
     }
 
+    @Override
+    public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
+        return delegateWriteTx.commit();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final LogicalDatastoreType store, final YangInstanceIdentifier path) {
index ac24ac8f35acd592e2a855ba671a14b4bc25cda1..23652e767b319eda402d460c336d25fc72d01750 100644 (file)
@@ -9,15 +9,12 @@
 package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
@@ -93,18 +90,6 @@ public class WriteCandidateTx extends AbstractWriteTx {
         cleanupOnSuccess();
     }
 
-    @Override
-    public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        final ListenableFuture<Void> commitFutureAsVoid = Futures.transform(commitConfiguration(), input -> {
-            Preconditions.checkArgument(input.isSuccessful() && input.getErrors().isEmpty(),
-                "Submit failed with errors: %s", input.getErrors());
-            return null;
-        }, MoreExecutors.directExecutor());
-
-        return Futures.makeChecked(commitFutureAsVoid, input -> new TransactionCommitFailedException(
-                "Submit of transaction " + getIdentifier() + " failed", input));
-    }
-
     /**
      * This has to be non blocking since it is called from a callback on commit
      * and its netty threadpool that is really sensitive to blocking calls.
index 6b1999f74e9192ef9fea27fb609c193021e795f3..f843340b25109ff02619637c24da3165c992a859 100644 (file)
@@ -8,15 +8,10 @@
 
 package org.opendaylight.netconf.sal.connect.netconf.sal.tx;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback;
@@ -70,16 +65,6 @@ public class WriteRunningTx extends AbstractWriteTx {
         unlock();
     }
 
-    @Override
-    public synchronized CheckedFuture<Void, TransactionCommitFailedException> submit() {
-        final ListenableFuture<Void> commmitFutureAsVoid = Futures.transform(commitConfiguration(),
-                (Function<RpcResult<Void>, Void>) input -> null, MoreExecutors.directExecutor());
-
-        return Futures.makeChecked(commmitFutureAsVoid,
-            input -> new TransactionCommitFailedException("Submit of transaction " + getIdentifier() + " failed",
-                input));
-    }
-
     @Override
     public synchronized ListenableFuture<RpcResult<Void>> performCommit() {
         for (final Change change : changes) {