X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2Factors%2FNetconfDataTreeServiceActor.java;h=dff990f8d74325225a74204526dd0ba47a0c03e9;hb=refs%2Fchanges%2F48%2F103748%2F2;hp=1297b88eb50d001892e23be15758069d10295592;hpb=9631593590312ae7641077c182c1fb59723b2241;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfDataTreeServiceActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfDataTreeServiceActor.java index 1297b88eb5..dff990f8d7 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfDataTreeServiceActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfDataTreeServiceActor.java @@ -12,14 +12,14 @@ import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.actor.Status; import akka.actor.UntypedAbstractActor; -import com.google.common.util.concurrent.FluentFuture; +import akka.util.JavaDurationConverters; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.util.ArrayList; -import java.util.List; +import java.time.Duration; import java.util.Optional; -import org.opendaylight.mdsal.common.api.CommitInfo; +import java.util.function.Supplier; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; @@ -36,12 +36,13 @@ import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditCon import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest; import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest; import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; public final class NetconfDataTreeServiceActor extends UntypedAbstractActor { private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class); @@ -49,13 +50,11 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor { private final NetconfDataTreeService netconfService; private final long idleTimeout; - private List> resultsFutures = new ArrayList<>(); - private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) { this.netconfService = netconfService; this.idleTimeout = idleTimeout.toSeconds(); if (this.idleTimeout > 0) { - context().setReceiveTimeout(idleTimeout); + context().setReceiveTimeout(JavaDurationConverters.asFiniteDuration(idleTimeout)); } } @@ -66,74 +65,64 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor { @Override public void onReceive(final Object message) { - if (message instanceof GetWithFieldsRequest) { - final GetWithFieldsRequest getRequest = (GetWithFieldsRequest) message; + if (message instanceof GetWithFieldsRequest getRequest) { final YangInstanceIdentifier path = getRequest.getPath(); - final ListenableFuture>> future = netconfService.get( + final ListenableFuture> future = netconfService.get( getRequest.getPath(), getRequest.getFields()); context().stop(self()); sendResult(future, path, sender(), self()); - } else if (message instanceof GetRequest) { - final GetRequest getRequest = (GetRequest) message; + } else if (message instanceof GetRequest getRequest) { final YangInstanceIdentifier path = getRequest.getPath(); - final ListenableFuture>> future = netconfService.get(path); + final ListenableFuture> future = netconfService.get(path); context().stop(self()); sendResult(future, path, sender(), self()); - } else if (message instanceof GetConfigWithFieldsRequest) { - final GetConfigWithFieldsRequest getConfigRequest = (GetConfigWithFieldsRequest) message; + } else if (message instanceof GetConfigWithFieldsRequest getConfigRequest) { final YangInstanceIdentifier path = getConfigRequest.getPath(); - final ListenableFuture>> future = netconfService.getConfig( + final ListenableFuture> future = netconfService.getConfig( path, getConfigRequest.getFields()); context().stop(self()); sendResult(future, path, sender(), self()); - } else if (message instanceof GetConfigRequest) { - final GetConfigRequest getConfigRequest = (GetConfigRequest) message; + } else if (message instanceof GetConfigRequest getConfigRequest) { final YangInstanceIdentifier path = getConfigRequest.getPath(); - final ListenableFuture>> future = netconfService.getConfig(path); + final ListenableFuture> future = netconfService.getConfig(path); context().stop(self()); sendResult(future, path, sender(), self()); } else if (message instanceof LockRequest) { - resultsFutures.addAll(netconfService.lock()); - } else if (message instanceof MergeEditConfigRequest) { - final MergeEditConfigRequest request = (MergeEditConfigRequest) message; - resultsFutures.add(netconfService.merge( + invokeRpcCall(netconfService::lock, sender(), self()); + } else if (message instanceof MergeEditConfigRequest request) { + netconfService.merge( request.getStore(), request.getNormalizedNodeMessage().getIdentifier(), request.getNormalizedNodeMessage().getNode(), - Optional.ofNullable(request.getDefaultOperation()))); - } else if (message instanceof ReplaceEditConfigRequest) { - final ReplaceEditConfigRequest request = (ReplaceEditConfigRequest) message; - resultsFutures.add(netconfService.replace( + Optional.ofNullable(request.getDefaultOperation())); + } else if (message instanceof ReplaceEditConfigRequest request) { + netconfService.replace( request.getStore(), request.getNormalizedNodeMessage().getIdentifier(), request.getNormalizedNodeMessage().getNode(), - Optional.ofNullable(request.getDefaultOperation()))); - } else if (message instanceof CreateEditConfigRequest) { - final CreateEditConfigRequest request = (CreateEditConfigRequest) message; - resultsFutures.add(netconfService.create( + Optional.ofNullable(request.getDefaultOperation())); + } else if (message instanceof CreateEditConfigRequest request) { + netconfService.create( request.getStore(), request.getNormalizedNodeMessage().getIdentifier(), request.getNormalizedNodeMessage().getNode(), - Optional.ofNullable(request.getDefaultOperation()))); - } else if (message instanceof DeleteEditConfigRequest) { - final DeleteEditConfigRequest request = (DeleteEditConfigRequest) message; - resultsFutures.add(netconfService.delete(request.getStore(), request.getPath())); - } else if (message instanceof RemoveEditConfigRequest) { - final RemoveEditConfigRequest request = (RemoveEditConfigRequest) message; - resultsFutures.add(netconfService.remove(request.getStore(), request.getPath())); + Optional.ofNullable(request.getDefaultOperation())); + } else if (message instanceof DeleteEditConfigRequest request) { + netconfService.delete(request.getStore(), request.getPath()); + } else if (message instanceof RemoveEditConfigRequest request) { + netconfService.remove(request.getStore(), request.getPath()); } else if (message instanceof CommitRequest) { - context().stop(self()); submit(sender(), self()); } else if (message instanceof DiscardChangesRequest) { - netconfService.discardChanges(); + invokeRpcCall(netconfService::discardChanges, sender(), self()); } else if (message instanceof UnlockRequest) { context().stop(self()); - netconfService.unlock(); + invokeRpcCall(netconfService::unlock, sender(), self()); } else if (message instanceof ReceiveTimeout) { LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor", idleTimeout); - netconfService.discardChanges(); - netconfService.unlock(); + invokeRpcCall(netconfService::discardChanges, sender(), self()); + invokeRpcCall(netconfService::unlock, sender(), self()); context().stop(self()); } else { unhandled(message); @@ -141,11 +130,41 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor { } private void submit(final ActorRef requester, final ActorRef self) { - final ListenableFuture submitFuture = netconfService.commit(resultsFutures); - FluentFuture.from(submitFuture).addCallback(new FutureCallback() { + Futures.addCallback(netconfService.commit(), new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult result) { + if (result == null) { + requester.tell(new EmptyResultResponse(), getSender()); + return; + } + NormalizedNodeMessage nodeMessageResp = null; + if (result.value() != null) { + nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.value()); + } + requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.errors()), self); + } + + @Override + public void onFailure(final Throwable throwable) { + requester.tell(new Status.Failure(throwable), self); + } + }, MoreExecutors.directExecutor()); + } + + private void invokeRpcCall(final Supplier> operation, + final ActorRef requester, final ActorRef self) { + Futures.addCallback(operation.get(), new FutureCallback() { @Override - public void onSuccess(final CommitInfo result) { - requester.tell(new Status.Success(null), self); + public void onSuccess(final DOMRpcResult rpcResult) { + if (rpcResult == null) { + requester.tell(new EmptyResultResponse(), getSender()); + return; + } + NormalizedNodeMessage nodeMessageResp = null; + if (rpcResult.value() != null) { + nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.value()); + } + requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.errors()), self); } @Override @@ -155,13 +174,12 @@ public final class NetconfDataTreeServiceActor extends UntypedAbstractActor { }, MoreExecutors.directExecutor()); } - private void sendResult(final ListenableFuture>> feature, - final YangInstanceIdentifier path, - final ActorRef sender, final ActorRef self) { - FluentFuture.from(feature).addCallback(new FutureCallback<>() { + private static void sendResult(final ListenableFuture> feature, + final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) { + Futures.addCallback(feature, new FutureCallback<>() { @Override - public void onSuccess(final Optional> result) { - if (!result.isPresent()) { + public void onSuccess(final Optional result) { + if (result.isEmpty()) { sender.tell(new EmptyReadResponse(), self); return; }