Add changed-leaf-nodes-only subscription extension
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / netconf / ActorProxyNetconfServiceFacade.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
9
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
12
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
29 import org.opendaylight.netconf.api.EffectiveOperation;
30 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
31 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
32 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
33 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56
57 public class ActorProxyNetconfServiceFacade implements ProxyNetconfServiceFacade {
58     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyNetconfServiceFacade.class);
59
60     private final ActorRef masterActor;
61     private final RemoteDeviceId id;
62     private final ExecutionContext executionContext;
63     private final Timeout askTimeout;
64
65     public ActorProxyNetconfServiceFacade(final ActorRef masterActor, final RemoteDeviceId id,
66                                           final ExecutionContext executionContext, final Timeout askTimeout) {
67         this.masterActor = Objects.requireNonNull(masterActor);
68         this.id = Objects.requireNonNull(id);
69         this.executionContext = Objects.requireNonNull(executionContext);
70         this.askTimeout = Objects.requireNonNull(askTimeout);
71     }
72
73     @Override
74     public ListenableFuture<DOMRpcResult> lock() {
75         LOG.debug("{}: Lock via actor {}", id, masterActor);
76         final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
77         final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
78         future.onComplete(new OnComplete<>() {
79             @Override
80             public void onComplete(final Throwable failure, final Object response) {
81                 if (failure != null) {
82                     lockResult.setException(failure);
83                 } else if (response instanceof InvokeRpcMessageReply) {
84                     lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
85                 } else {
86                     lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
87                     LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
88                 }
89             }
90         }, executionContext);
91         return lockResult;
92     }
93
94     @Override
95     public ListenableFuture<DOMRpcResult> unlock() {
96         LOG.debug("{}: Unlock via actor {}", id, masterActor);
97         final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
98         final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
99         future.onComplete(new OnComplete<>() {
100             @Override
101             public void onComplete(final Throwable failure, final Object response) {
102                 if (failure != null) {
103                     unlockResult.setException(failure);
104                 } else if (response instanceof InvokeRpcMessageReply) {
105                     unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
106                 } else {
107                     unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
108                     LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
109                 }
110             }
111         }, executionContext);
112         return unlockResult;
113     }
114
115     @Override
116     public ListenableFuture<DOMRpcResult> discardChanges() {
117         LOG.debug("{}: Discard changes via actor {}", id, masterActor);
118         final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
119         final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
120         future.onComplete(new OnComplete<>() {
121             @Override
122             public void onComplete(final Throwable failure, final Object response) {
123                 if (failure != null) {
124                     discardChangesResult.setException(failure);
125                 } else if (response instanceof InvokeRpcMessageReply) {
126                     discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
127                 } else {
128                     discardChangesResult.setException(
129                         new ClusteringRpcException("Discard changes operation returned unexpected type"));
130                     LOG.error("{}: Discard changes  via actor {} returned unexpected type", id, masterActor);
131                 }
132             }
133         }, executionContext);
134         return discardChangesResult;
135     }
136
137     @Override
138     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
139         LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
140         final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
141         return read(future, OPERATIONAL, path);
142     }
143
144     @Override
145     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
146             final List<YangInstanceIdentifier> fields) {
147         LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
148         final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
149         return read(future, OPERATIONAL, path);
150     }
151
152     @Override
153     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
154         LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
155         final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
156         return read(future, CONFIGURATION, path);
157     }
158
159     @Override
160     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
161             final List<YangInstanceIdentifier> fields) {
162         LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
163         final Future<Object> future = Patterns.ask(masterActor,
164                 new GetConfigWithFieldsRequest(path, fields), askTimeout);
165         return read(future, CONFIGURATION, path);
166     }
167
168     @Override
169     public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
170             final YangInstanceIdentifier path, final NormalizedNode data,
171             final Optional<EffectiveOperation> defaultOperation) {
172         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
173         masterActor.tell(new MergeEditConfigRequest(
174             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
175         return createResult();
176
177     }
178
179     @Override
180     public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
181             final YangInstanceIdentifier path, final NormalizedNode data,
182             final Optional<EffectiveOperation> defaultOperation) {
183         LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
184
185         masterActor.tell(new ReplaceEditConfigRequest(
186             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
187         return createResult();
188     }
189
190     @Override
191     public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
192             final YangInstanceIdentifier path, final NormalizedNode data,
193             final Optional<EffectiveOperation> defaultOperation) {
194         LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
195         masterActor.tell(new CreateEditConfigRequest(
196             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
197         return createResult();
198     }
199
200     @Override
201     public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
202             final YangInstanceIdentifier path) {
203         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
204         masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
205         return createResult();
206     }
207
208     @Override
209     public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
210             final YangInstanceIdentifier path) {
211         LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
212         masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
213         return createResult();
214     }
215
216     @Override
217     public ListenableFuture<? extends DOMRpcResult> commit() {
218         LOG.debug("{}: Commit via actor {}", id, masterActor);
219
220         final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
221         final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
222         future.onComplete(new OnComplete<>() {
223             @Override
224             public void onComplete(final Throwable failure, final Object response) {
225                 if (failure != null) {
226                     LOG.debug("{}: Commit failed", id, failure);
227                     settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
228                 } else if (response instanceof InvokeRpcMessageReply) {
229                     LOG.debug("{}: Commit succeeded", id);
230                     settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
231                 } else {
232                     settableFuture.setException(
233                         new ClusteringRpcException("Commit operation returned unexpected type"));
234                     LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
235                 }
236             }
237
238             private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
239                 return new NetconfServiceFailedException(String.format("%s: Commit of operation failed",
240                     getDeviceId()), failure);
241             }
242         }, executionContext);
243         return settableFuture;
244     }
245
246     @Override
247     public Object getDeviceId() {
248         return id;
249     }
250
251     private SettableFuture<Optional<NormalizedNode>> read(final Future<Object> future, final LogicalDatastoreType store,
252                                                           final YangInstanceIdentifier path) {
253         final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
254         future.onComplete(new OnComplete<>() {
255             @Override
256             public void onComplete(final Throwable failure, final Object response) {
257                 if (failure != null) {
258                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
259
260                     final Throwable processedFailure = processFailure(failure);
261                     if (processedFailure instanceof ReadFailedException) {
262                         settableFuture.setException(processedFailure);
263                     } else {
264                         settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
265                             + " failed", processedFailure));
266                     }
267                     return;
268                 }
269
270                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
271
272                 if (response instanceof EmptyReadResponse) {
273                     settableFuture.set(Optional.empty());
274                     return;
275                 }
276
277                 if (response instanceof NormalizedNodeMessage data) {
278                     settableFuture.set(Optional.of(data.getNode()));
279                 }
280             }
281         }, executionContext);
282
283         return settableFuture;
284     }
285
286     private Throwable processFailure(final Throwable failure) {
287         return failure instanceof AskTimeoutException
288             ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
289     }
290
291     // FIXME: this is being used in contexts where we should be waiting for a reply
292     private static ListenableFuture<? extends DOMRpcResult> createResult() {
293         return Futures.immediateFuture(new DefaultDOMRpcResult());
294     }
295
296     private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
297         if (reply.getNormalizedNodeMessage() == null) {
298             return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
299         } else {
300             return new DefaultDOMRpcResult((ContainerNode) reply.getNormalizedNodeMessage().getNode(),
301                 reply.getRpcErrors());
302         }
303     }
304 }