Add changed-leaf-nodes-only subscription extension
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfDataTreeServiceActor.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.ReceiveTimeout;
13 import akka.actor.Status;
14 import akka.actor.UntypedAbstractActor;
15 import akka.util.JavaDurationConverters;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import java.time.Duration;
21 import java.util.Optional;
22 import java.util.function.Supplier;
23 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
24 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
25 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
26 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
48     private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
49
50     private final NetconfDataTreeService netconfService;
51     private final long idleTimeout;
52
53     private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
54         this.netconfService = netconfService;
55         this.idleTimeout = idleTimeout.toSeconds();
56         if (this.idleTimeout > 0) {
57             context().setReceiveTimeout(JavaDurationConverters.asFiniteDuration(idleTimeout));
58         }
59     }
60
61     static Props props(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
62         return Props.create(NetconfDataTreeServiceActor.class, () ->
63             new NetconfDataTreeServiceActor(netconfService, idleTimeout));
64     }
65
66     @Override
67     public void onReceive(final Object message) {
68         if (message instanceof GetWithFieldsRequest getRequest) {
69             final YangInstanceIdentifier path = getRequest.getPath();
70             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(
71                     getRequest.getPath(), getRequest.getFields());
72             context().stop(self());
73             sendResult(future, path, sender(), self());
74         } else if (message instanceof GetRequest getRequest) {
75             final YangInstanceIdentifier path = getRequest.getPath();
76             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(path);
77             context().stop(self());
78             sendResult(future, path, sender(), self());
79         } else if (message instanceof GetConfigWithFieldsRequest getConfigRequest) {
80             final YangInstanceIdentifier path = getConfigRequest.getPath();
81             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(
82                     path, getConfigRequest.getFields());
83             context().stop(self());
84             sendResult(future, path, sender(), self());
85         } else if (message instanceof GetConfigRequest getConfigRequest) {
86             final YangInstanceIdentifier path = getConfigRequest.getPath();
87             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(path);
88             context().stop(self());
89             sendResult(future, path, sender(), self());
90         } else if (message instanceof LockRequest) {
91             invokeRpcCall(netconfService::lock, sender(), self());
92         } else if (message instanceof MergeEditConfigRequest request) {
93             netconfService.merge(
94                 request.getStore(),
95                 request.getNormalizedNodeMessage().getIdentifier(),
96                 request.getNormalizedNodeMessage().getNode(),
97                 Optional.ofNullable(request.getDefaultOperation()));
98         } else if (message instanceof ReplaceEditConfigRequest request) {
99             netconfService.replace(
100                 request.getStore(),
101                 request.getNormalizedNodeMessage().getIdentifier(),
102                 request.getNormalizedNodeMessage().getNode(),
103                 Optional.ofNullable(request.getDefaultOperation()));
104         } else if (message instanceof CreateEditConfigRequest request) {
105             netconfService.create(
106                 request.getStore(),
107                 request.getNormalizedNodeMessage().getIdentifier(),
108                 request.getNormalizedNodeMessage().getNode(),
109                 Optional.ofNullable(request.getDefaultOperation()));
110         } else if (message instanceof DeleteEditConfigRequest request) {
111             netconfService.delete(request.getStore(), request.getPath());
112         } else if (message instanceof RemoveEditConfigRequest request) {
113             netconfService.remove(request.getStore(), request.getPath());
114         } else if (message instanceof CommitRequest) {
115             submit(sender(), self());
116         } else if (message instanceof DiscardChangesRequest) {
117             invokeRpcCall(netconfService::discardChanges, sender(), self());
118         } else if (message instanceof UnlockRequest) {
119             context().stop(self());
120             invokeRpcCall(netconfService::unlock, sender(), self());
121         } else if (message instanceof ReceiveTimeout) {
122             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
123                 idleTimeout);
124             invokeRpcCall(netconfService::discardChanges, sender(), self());
125             invokeRpcCall(netconfService::unlock, sender(), self());
126             context().stop(self());
127         } else {
128             unhandled(message);
129         }
130     }
131
132     private void submit(final ActorRef requester, final ActorRef self) {
133         Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
134             @Override
135             public void onSuccess(final DOMRpcResult result) {
136                 if (result == null) {
137                     requester.tell(new EmptyResultResponse(), getSender());
138                     return;
139                 }
140                 NormalizedNodeMessage nodeMessageResp = null;
141                 if (result.value() != null) {
142                     nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.value());
143                 }
144                 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.errors()), self);
145             }
146
147             @Override
148             public void onFailure(final Throwable throwable) {
149                 requester.tell(new Status.Failure(throwable), self);
150             }
151         }, MoreExecutors.directExecutor());
152     }
153
154     private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
155         final ActorRef requester, final ActorRef self) {
156         Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
157             @Override
158             public void onSuccess(final DOMRpcResult rpcResult) {
159                 if (rpcResult == null) {
160                     requester.tell(new EmptyResultResponse(), getSender());
161                     return;
162                 }
163                 NormalizedNodeMessage nodeMessageResp = null;
164                 if (rpcResult.value() != null) {
165                     nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.value());
166                 }
167                 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.errors()), self);
168             }
169
170             @Override
171             public void onFailure(final Throwable throwable) {
172                 requester.tell(new Status.Failure(throwable), self);
173             }
174         }, MoreExecutors.directExecutor());
175     }
176
177     private static void sendResult(final ListenableFuture<Optional<NormalizedNode>> feature,
178             final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
179         Futures.addCallback(feature, new FutureCallback<>() {
180             @Override
181             public void onSuccess(final Optional<NormalizedNode> result) {
182                 if (result.isEmpty()) {
183                     sender.tell(new EmptyReadResponse(), self);
184                     return;
185                 }
186                 sender.tell(new NormalizedNodeMessage(path, result.get()), self);
187             }
188
189             @Override
190             public void onFailure(final Throwable throwable) {
191                 sender.tell(new Status.Failure(throwable), self);
192             }
193         }, MoreExecutors.directExecutor());
194     }
195 }