Bump MRI upstreams
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfDataTreeServiceActor.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.ReceiveTimeout;
13 import akka.actor.Status;
14 import akka.actor.UntypedAbstractActor;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.Optional;
20 import java.util.function.Supplier;
21 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
22 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
23 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
24 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
25 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import scala.concurrent.duration.Duration;
45
46 public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
47     private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
48
49     private final NetconfDataTreeService netconfService;
50     private final long idleTimeout;
51
52     private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
53         this.netconfService = netconfService;
54         this.idleTimeout = idleTimeout.toSeconds();
55         if (this.idleTimeout > 0) {
56             context().setReceiveTimeout(idleTimeout);
57         }
58     }
59
60     static Props props(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
61         return Props.create(NetconfDataTreeServiceActor.class, () ->
62             new NetconfDataTreeServiceActor(netconfService, idleTimeout));
63     }
64
65     @Override
66     public void onReceive(final Object message) {
67         if (message instanceof GetWithFieldsRequest) {
68             final GetWithFieldsRequest getRequest = (GetWithFieldsRequest) message;
69             final YangInstanceIdentifier path = getRequest.getPath();
70             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(
71                     getRequest.getPath(), getRequest.getFields());
72             context().stop(self());
73             sendResult(future, path, sender(), self());
74         } else if (message instanceof GetRequest) {
75             final GetRequest getRequest = (GetRequest) message;
76             final YangInstanceIdentifier path = getRequest.getPath();
77             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(path);
78             context().stop(self());
79             sendResult(future, path, sender(), self());
80         } else if (message instanceof GetConfigWithFieldsRequest) {
81             final GetConfigWithFieldsRequest getConfigRequest = (GetConfigWithFieldsRequest) message;
82             final YangInstanceIdentifier path = getConfigRequest.getPath();
83             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(
84                     path, getConfigRequest.getFields());
85             context().stop(self());
86             sendResult(future, path, sender(), self());
87         } else if (message instanceof GetConfigRequest) {
88             final GetConfigRequest getConfigRequest = (GetConfigRequest) message;
89             final YangInstanceIdentifier path = getConfigRequest.getPath();
90             final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(path);
91             context().stop(self());
92             sendResult(future, path, sender(), self());
93         } else if (message instanceof LockRequest) {
94             invokeRpcCall(netconfService::lock, sender(), self());
95         } else if (message instanceof MergeEditConfigRequest) {
96             final MergeEditConfigRequest request = (MergeEditConfigRequest) message;
97             netconfService.merge(
98                 request.getStore(),
99                 request.getNormalizedNodeMessage().getIdentifier(),
100                 request.getNormalizedNodeMessage().getNode(),
101                 Optional.ofNullable(request.getDefaultOperation()));
102         } else if (message instanceof ReplaceEditConfigRequest) {
103             final ReplaceEditConfigRequest request = (ReplaceEditConfigRequest) message;
104             netconfService.replace(
105                 request.getStore(),
106                 request.getNormalizedNodeMessage().getIdentifier(),
107                 request.getNormalizedNodeMessage().getNode(),
108                 Optional.ofNullable(request.getDefaultOperation()));
109         } else if (message instanceof CreateEditConfigRequest) {
110             final CreateEditConfigRequest request = (CreateEditConfigRequest) message;
111             netconfService.create(
112                 request.getStore(),
113                 request.getNormalizedNodeMessage().getIdentifier(),
114                 request.getNormalizedNodeMessage().getNode(),
115                 Optional.ofNullable(request.getDefaultOperation()));
116         } else if (message instanceof DeleteEditConfigRequest) {
117             final DeleteEditConfigRequest request = (DeleteEditConfigRequest) message;
118             netconfService.delete(request.getStore(), request.getPath());
119         } else if (message instanceof RemoveEditConfigRequest) {
120             final RemoveEditConfigRequest request = (RemoveEditConfigRequest) message;
121             netconfService.remove(request.getStore(), request.getPath());
122         } else if (message instanceof CommitRequest) {
123             submit(sender(), self());
124         } else if (message instanceof DiscardChangesRequest) {
125             invokeRpcCall(netconfService::discardChanges, sender(), self());
126         } else if (message instanceof UnlockRequest) {
127             context().stop(self());
128             invokeRpcCall(netconfService::unlock, sender(), self());
129         } else if (message instanceof ReceiveTimeout) {
130             LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
131                 idleTimeout);
132             invokeRpcCall(netconfService::discardChanges, sender(), self());
133             invokeRpcCall(netconfService::unlock, sender(), self());
134             context().stop(self());
135         } else {
136             unhandled(message);
137         }
138     }
139
140     private void submit(final ActorRef requester, final ActorRef self) {
141         Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
142             @Override
143             public void onSuccess(final DOMRpcResult result) {
144                 if (result == null) {
145                     requester.tell(new EmptyResultResponse(), getSender());
146                     return;
147                 }
148                 NormalizedNodeMessage nodeMessageResp = null;
149                 if (result.getResult() != null) {
150                     nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.getResult());
151                 }
152                 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.getErrors()), self);
153             }
154
155             @Override
156             public void onFailure(final Throwable throwable) {
157                 requester.tell(new Status.Failure(throwable), self);
158             }
159         }, MoreExecutors.directExecutor());
160     }
161
162     private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
163         final ActorRef requester, final ActorRef self) {
164         Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
165             @Override
166             public void onSuccess(final DOMRpcResult rpcResult) {
167                 if (rpcResult == null) {
168                     requester.tell(new EmptyResultResponse(), getSender());
169                     return;
170                 }
171                 NormalizedNodeMessage nodeMessageResp = null;
172                 if (rpcResult.getResult() != null) {
173                     nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.getResult());
174                 }
175                 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.getErrors()), self);
176             }
177
178             @Override
179             public void onFailure(final Throwable throwable) {
180                 requester.tell(new Status.Failure(throwable), self);
181             }
182         }, MoreExecutors.directExecutor());
183     }
184
185     private static void sendResult(final ListenableFuture<Optional<NormalizedNode>> feature,
186             final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
187         Futures.addCallback(feature, new FutureCallback<>() {
188             @Override
189             public void onSuccess(final Optional<NormalizedNode> result) {
190                 if (result.isEmpty()) {
191                     sender.tell(new EmptyReadResponse(), self);
192                     return;
193                 }
194                 sender.tell(new NormalizedNodeMessage(path, result.get()), self);
195             }
196
197             @Override
198             public void onFailure(final Throwable throwable) {
199                 sender.tell(new Status.Failure(throwable), self);
200             }
201         }, MoreExecutors.directExecutor());
202     }
203 }