f6195e3d8b6c2b31f60325c6de04aa7cbb075f71
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / netconf / ActorProxyNetconfServiceFacade.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
9
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
12
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.common.api.ReadFailedException;
28 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
29 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
30 import org.opendaylight.netconf.api.ModifyAction;
31 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
32 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
33 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
34 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
48 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
49 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56
57 public class ActorProxyNetconfServiceFacade implements ProxyNetconfServiceFacade {
58     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyNetconfServiceFacade.class);
59
60     private final ActorRef masterActor;
61     private final RemoteDeviceId id;
62     private final ExecutionContext executionContext;
63     private final Timeout askTimeout;
64
65     public ActorProxyNetconfServiceFacade(final ActorRef masterActor, final RemoteDeviceId id,
66                                           final ExecutionContext executionContext, final Timeout askTimeout) {
67         this.masterActor = Objects.requireNonNull(masterActor);
68         this.id = Objects.requireNonNull(id);
69         this.executionContext = Objects.requireNonNull(executionContext);
70         this.askTimeout = Objects.requireNonNull(askTimeout);
71     }
72
73     @Override
74     public ListenableFuture<DOMRpcResult> lock() {
75         LOG.debug("{}: Lock via actor {}", id, masterActor);
76         final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
77         final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
78         future.onComplete(new OnComplete<>() {
79             @Override
80             public void onComplete(final Throwable failure, final Object response) {
81                 if (failure != null) {
82                     lockResult.setException(failure);
83                 } else if (response instanceof InvokeRpcMessageReply) {
84                     lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
85                 } else {
86                     lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
87                     LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
88                 }
89             }
90         }, executionContext);
91         return lockResult;
92     }
93
94     @Override
95     public ListenableFuture<DOMRpcResult> unlock() {
96         LOG.debug("{}: Unlock via actor {}", id, masterActor);
97         final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
98         final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
99         future.onComplete(new OnComplete<>() {
100             @Override
101             public void onComplete(final Throwable failure, final Object response) {
102                 if (failure != null) {
103                     unlockResult.setException(failure);
104                 } else if (response instanceof InvokeRpcMessageReply) {
105                     unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
106                 } else {
107                     unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
108                     LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
109                 }
110             }
111         }, executionContext);
112         return unlockResult;
113     }
114
115     @Override
116     public ListenableFuture<DOMRpcResult> discardChanges() {
117         LOG.debug("{}: Discard changes via actor {}", id, masterActor);
118         final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
119         final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
120         future.onComplete(new OnComplete<>() {
121             @Override
122             public void onComplete(final Throwable failure, final Object response) {
123                 if (failure != null) {
124                     discardChangesResult.setException(failure);
125                 } else if (response instanceof InvokeRpcMessageReply) {
126                     discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
127                 } else {
128                     discardChangesResult.setException(
129                         new ClusteringRpcException("Discard changes operation returned unexpected type"));
130                     LOG.error("{}: Discard changes  via actor {} returned unexpected type", id, masterActor);
131                 }
132             }
133         }, executionContext);
134         return discardChangesResult;
135     }
136
137     @Override
138     public ListenableFuture<Optional<NormalizedNode<?, ?>>> get(final YangInstanceIdentifier path) {
139         LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
140         final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
141         return read(future, OPERATIONAL, path);
142     }
143
144     @Override
145     public ListenableFuture<Optional<NormalizedNode<?, ?>>> get(final YangInstanceIdentifier path,
146             final List<YangInstanceIdentifier> fields) {
147         LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
148         final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
149         return read(future, OPERATIONAL, path);
150     }
151
152     @Override
153     public ListenableFuture<Optional<NormalizedNode<?, ?>>> getConfig(final YangInstanceIdentifier path) {
154         LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
155         final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
156         return read(future, CONFIGURATION, path);
157     }
158
159     @Override
160     public ListenableFuture<Optional<NormalizedNode<?, ?>>> getConfig(final YangInstanceIdentifier path,
161             final List<YangInstanceIdentifier> fields) {
162         LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
163         final Future<Object> future = Patterns.ask(masterActor,
164                 new GetConfigWithFieldsRequest(path, fields), askTimeout);
165         return read(future, CONFIGURATION, path);
166     }
167
168     @Override
169     public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
170             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
171             final Optional<ModifyAction> defaultOperation) {
172         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
173         masterActor.tell(new MergeEditConfigRequest(
174             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
175         return createResult();
176
177     }
178
179     @Override
180     public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
181             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
182             final Optional<ModifyAction> defaultOperation) {
183         LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
184
185         masterActor.tell(new ReplaceEditConfigRequest(
186             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
187         return createResult();
188     }
189
190     @Override
191     public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
192             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
193             final Optional<ModifyAction> defaultOperation) {
194         LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
195         masterActor.tell(new CreateEditConfigRequest(
196             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
197         return createResult();
198     }
199
200     @Override
201     public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
202             final YangInstanceIdentifier path) {
203         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
204         masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
205         return createResult();
206     }
207
208     @Override
209     public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
210             final YangInstanceIdentifier path) {
211         LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
212         masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
213         return createResult();
214     }
215
216     @Override
217     public ListenableFuture<? extends DOMRpcResult> commit() {
218         LOG.debug("{}: Commit via actor {}", id, masterActor);
219
220         final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
221         final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
222         future.onComplete(new OnComplete<>() {
223             @Override
224             public void onComplete(final Throwable failure, final Object response) {
225                 if (failure != null) {
226                     LOG.debug("{}: Commit failed", id, failure);
227                     settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
228                 } else if (response instanceof InvokeRpcMessageReply) {
229                     LOG.debug("{}: Commit succeeded", id);
230                     settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
231                 } else {
232                     settableFuture.setException(
233                         new ClusteringRpcException("Commit operation returned unexpected type"));
234                     LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
235                 }
236             }
237
238             private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
239                 return new NetconfServiceFailedException(String.format("%s: Commit of operation failed",
240                     getDeviceId()), failure);
241             }
242         }, executionContext);
243         return settableFuture;
244     }
245
246     @Override
247     public @NonNull Object getDeviceId() {
248         return id;
249     }
250
251     private SettableFuture<Optional<NormalizedNode<?, ?>>> read(final Future<Object> future,
252                                                                 final LogicalDatastoreType store,
253                                                                 final YangInstanceIdentifier path) {
254         final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
255         future.onComplete(new OnComplete<>() {
256             @Override
257             public void onComplete(final Throwable failure, final Object response) {
258                 if (failure != null) {
259                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
260
261                     final Throwable processedFailure = processFailure(failure);
262                     if (processedFailure instanceof ReadFailedException) {
263                         settableFuture.setException(processedFailure);
264                     } else {
265                         settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
266                             + " failed", processedFailure));
267                     }
268                     return;
269                 }
270
271                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
272
273                 if (response instanceof EmptyReadResponse) {
274                     settableFuture.set(Optional.empty());
275                     return;
276                 }
277
278                 if (response instanceof NormalizedNodeMessage) {
279                     final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
280                     settableFuture.set(Optional.of(data.getNode()));
281                 }
282             }
283         }, executionContext);
284
285         return settableFuture;
286     }
287
288     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
289         justification = "https://github.com/spotbugs/spotbugs/issues/811")
290     private Throwable processFailure(final Throwable failure) {
291         return failure instanceof AskTimeoutException
292             ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
293     }
294
295     private ListenableFuture<? extends DOMRpcResult> createResult() {
296         final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
297         settableFuture.set(new DefaultDOMRpcResult());
298         return settableFuture;
299     }
300
301     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
302         justification = "https://github.com/spotbugs/spotbugs/issues/811")
303     private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
304         if (reply.getNormalizedNodeMessage() == null) {
305             return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
306         } else {
307             return new DefaultDOMRpcResult(reply.getNormalizedNodeMessage().getNode(), reply.getRpcErrors());
308         }
309     }
310 }