Migrate getResult()/getErrors() callers
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfDataTreeServiceActor.java
index 1297b88eb50d001892e23be15758069d10295592..dff990f8d74325225a74204526dd0ba47a0c03e9 100644 (file)
@@ -12,14 +12,14 @@ import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.actor.Status;
 import akka.actor.UntypedAbstractActor;
-import com.google.common.util.concurrent.FluentFuture;
+import akka.util.JavaDurationConverters;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.util.ArrayList;
-import java.util.List;
+import java.time.Duration;
 import java.util.Optional;
-import org.opendaylight.mdsal.common.api.CommitInfo;
+import java.util.function.Supplier;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
@@ -36,12 +36,13 @@ import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditCon
 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
 
 public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
@@ -49,13 +50,11 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
     private final NetconfDataTreeService netconfService;
     private final long idleTimeout;
 
-    private List<ListenableFuture<? extends DOMRpcResult>> resultsFutures = new ArrayList<>();
-
     private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
         this.netconfService = netconfService;
         this.idleTimeout = idleTimeout.toSeconds();
         if (this.idleTimeout > 0) {
-            context().setReceiveTimeout(idleTimeout);
+            context().setReceiveTimeout(JavaDurationConverters.asFiniteDuration(idleTimeout));
         }
     }
 
@@ -66,74 +65,64 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
 
     @Override
     public void onReceive(final Object message) {
-        if (message instanceof GetWithFieldsRequest) {
-            final GetWithFieldsRequest getRequest = (GetWithFieldsRequest) message;
+        if (message instanceof GetWithFieldsRequest getRequest) {
             final YangInstanceIdentifier path = getRequest.getPath();
-            final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(
+            final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(
                     getRequest.getPath(), getRequest.getFields());
             context().stop(self());
             sendResult(future, path, sender(), self());
-        } else if (message instanceof GetRequest) {
-            final GetRequest getRequest = (GetRequest) message;
+        } else if (message instanceof GetRequest getRequest) {
             final YangInstanceIdentifier path = getRequest.getPath();
-            final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(path);
+            final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(path);
             context().stop(self());
             sendResult(future, path, sender(), self());
-        } else if (message instanceof GetConfigWithFieldsRequest) {
-            final GetConfigWithFieldsRequest getConfigRequest = (GetConfigWithFieldsRequest) message;
+        } else if (message instanceof GetConfigWithFieldsRequest getConfigRequest) {
             final YangInstanceIdentifier path = getConfigRequest.getPath();
-            final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(
+            final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(
                     path, getConfigRequest.getFields());
             context().stop(self());
             sendResult(future, path, sender(), self());
-        } else if (message instanceof GetConfigRequest) {
-            final GetConfigRequest getConfigRequest = (GetConfigRequest) message;
+        } else if (message instanceof GetConfigRequest getConfigRequest) {
             final YangInstanceIdentifier path = getConfigRequest.getPath();
-            final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(path);
+            final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(path);
             context().stop(self());
             sendResult(future, path, sender(), self());
         } else if (message instanceof LockRequest) {
-            resultsFutures.addAll(netconfService.lock());
-        } else if (message instanceof MergeEditConfigRequest) {
-            final MergeEditConfigRequest request = (MergeEditConfigRequest) message;
-            resultsFutures.add(netconfService.merge(
+            invokeRpcCall(netconfService::lock, sender(), self());
+        } else if (message instanceof MergeEditConfigRequest request) {
+            netconfService.merge(
                 request.getStore(),
                 request.getNormalizedNodeMessage().getIdentifier(),
                 request.getNormalizedNodeMessage().getNode(),
-                Optional.ofNullable(request.getDefaultOperation())));
-        } else if (message instanceof ReplaceEditConfigRequest) {
-            final ReplaceEditConfigRequest request = (ReplaceEditConfigRequest) message;
-            resultsFutures.add(netconfService.replace(
+                Optional.ofNullable(request.getDefaultOperation()));
+        } else if (message instanceof ReplaceEditConfigRequest request) {
+            netconfService.replace(
                 request.getStore(),
                 request.getNormalizedNodeMessage().getIdentifier(),
                 request.getNormalizedNodeMessage().getNode(),
-                Optional.ofNullable(request.getDefaultOperation())));
-        } else if (message instanceof CreateEditConfigRequest) {
-            final CreateEditConfigRequest request = (CreateEditConfigRequest) message;
-            resultsFutures.add(netconfService.create(
+                Optional.ofNullable(request.getDefaultOperation()));
+        } else if (message instanceof CreateEditConfigRequest request) {
+            netconfService.create(
                 request.getStore(),
                 request.getNormalizedNodeMessage().getIdentifier(),
                 request.getNormalizedNodeMessage().getNode(),
-                Optional.ofNullable(request.getDefaultOperation())));
-        } else if (message instanceof DeleteEditConfigRequest) {
-            final DeleteEditConfigRequest request = (DeleteEditConfigRequest) message;
-            resultsFutures.add(netconfService.delete(request.getStore(), request.getPath()));
-        } else if (message instanceof RemoveEditConfigRequest) {
-            final RemoveEditConfigRequest request = (RemoveEditConfigRequest) message;
-            resultsFutures.add(netconfService.remove(request.getStore(), request.getPath()));
+                Optional.ofNullable(request.getDefaultOperation()));
+        } else if (message instanceof DeleteEditConfigRequest request) {
+            netconfService.delete(request.getStore(), request.getPath());
+        } else if (message instanceof RemoveEditConfigRequest request) {
+            netconfService.remove(request.getStore(), request.getPath());
         } else if (message instanceof CommitRequest) {
-            context().stop(self());
             submit(sender(), self());
         } else if (message instanceof DiscardChangesRequest) {
-            netconfService.discardChanges();
+            invokeRpcCall(netconfService::discardChanges, sender(), self());
         } else if (message instanceof UnlockRequest) {
             context().stop(self());
-            netconfService.unlock();
+            invokeRpcCall(netconfService::unlock, sender(), self());
         } else if (message instanceof ReceiveTimeout) {
             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
                 idleTimeout);
-            netconfService.discardChanges();
-            netconfService.unlock();
+            invokeRpcCall(netconfService::discardChanges, sender(), self());
+            invokeRpcCall(netconfService::unlock, sender(), self());
             context().stop(self());
         } else {
             unhandled(message);
@@ -141,11 +130,41 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
     }
 
     private void submit(final ActorRef requester, final ActorRef self) {
-        final ListenableFuture<? extends CommitInfo> submitFuture = netconfService.commit(resultsFutures);
-        FluentFuture.from(submitFuture).addCallback(new FutureCallback<CommitInfo>() {
+        Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(final DOMRpcResult result) {
+                if (result == null) {
+                    requester.tell(new EmptyResultResponse(), getSender());
+                    return;
+                }
+                NormalizedNodeMessage nodeMessageResp = null;
+                if (result.value() != null) {
+                    nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.value());
+                }
+                requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.errors()), self);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                requester.tell(new Status.Failure(throwable), self);
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
+        final ActorRef requester, final ActorRef self) {
+        Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
             @Override
-            public void onSuccess(final CommitInfo result) {
-                requester.tell(new Status.Success(null), self);
+            public void onSuccess(final DOMRpcResult rpcResult) {
+                if (rpcResult == null) {
+                    requester.tell(new EmptyResultResponse(), getSender());
+                    return;
+                }
+                NormalizedNodeMessage nodeMessageResp = null;
+                if (rpcResult.value() != null) {
+                    nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.value());
+                }
+                requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.errors()), self);
             }
 
             @Override
@@ -155,13 +174,12 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
         }, MoreExecutors.directExecutor());
     }
 
-    private void sendResult(final ListenableFuture<Optional<NormalizedNode<?, ?>>> feature,
-                            final YangInstanceIdentifier path,
-                            final ActorRef sender, final ActorRef self) {
-        FluentFuture.from(feature).addCallback(new FutureCallback<>() {
+    private static void sendResult(final ListenableFuture<Optional<NormalizedNode>> feature,
+            final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
+        Futures.addCallback(feature, new FutureCallback<>() {
             @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                if (!result.isPresent()) {
+            public void onSuccess(final Optional<NormalizedNode> result) {
+                if (result.isEmpty()) {
                     sender.tell(new EmptyReadResponse(), self);
                     return;
                 }