Cleanup ActorProxyNetconfServiceFacade
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / netconf / ActorProxyNetconfServiceFacade.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
9
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
12
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
29 import org.opendaylight.netconf.api.ModifyAction;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
32 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
33 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.ExecutionContext;
54 import scala.concurrent.Future;
55
56 public class ActorProxyNetconfServiceFacade implements ProxyNetconfServiceFacade {
57     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyNetconfServiceFacade.class);
58
59     private final ActorRef masterActor;
60     private final RemoteDeviceId id;
61     private final ExecutionContext executionContext;
62     private final Timeout askTimeout;
63
64     public ActorProxyNetconfServiceFacade(final ActorRef masterActor, final RemoteDeviceId id,
65                                           final ExecutionContext executionContext, final Timeout askTimeout) {
66         this.masterActor = Objects.requireNonNull(masterActor);
67         this.id = Objects.requireNonNull(id);
68         this.executionContext = Objects.requireNonNull(executionContext);
69         this.askTimeout = Objects.requireNonNull(askTimeout);
70     }
71
72     @Override
73     public ListenableFuture<DOMRpcResult> lock() {
74         LOG.debug("{}: Lock via actor {}", id, masterActor);
75         final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
76         final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
77         future.onComplete(new OnComplete<>() {
78             @Override
79             public void onComplete(final Throwable failure, final Object response) {
80                 if (failure != null) {
81                     lockResult.setException(failure);
82                 } else if (response instanceof InvokeRpcMessageReply) {
83                     lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
84                 } else {
85                     lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
86                     LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
87                 }
88             }
89         }, executionContext);
90         return lockResult;
91     }
92
93     @Override
94     public ListenableFuture<DOMRpcResult> unlock() {
95         LOG.debug("{}: Unlock via actor {}", id, masterActor);
96         final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
97         final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
98         future.onComplete(new OnComplete<>() {
99             @Override
100             public void onComplete(final Throwable failure, final Object response) {
101                 if (failure != null) {
102                     unlockResult.setException(failure);
103                 } else if (response instanceof InvokeRpcMessageReply) {
104                     unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
105                 } else {
106                     unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
107                     LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
108                 }
109             }
110         }, executionContext);
111         return unlockResult;
112     }
113
114     @Override
115     public ListenableFuture<DOMRpcResult> discardChanges() {
116         LOG.debug("{}: Discard changes via actor {}", id, masterActor);
117         final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
118         final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
119         future.onComplete(new OnComplete<>() {
120             @Override
121             public void onComplete(final Throwable failure, final Object response) {
122                 if (failure != null) {
123                     discardChangesResult.setException(failure);
124                 } else if (response instanceof InvokeRpcMessageReply) {
125                     discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
126                 } else {
127                     discardChangesResult.setException(
128                         new ClusteringRpcException("Discard changes operation returned unexpected type"));
129                     LOG.error("{}: Discard changes  via actor {} returned unexpected type", id, masterActor);
130                 }
131             }
132         }, executionContext);
133         return discardChangesResult;
134     }
135
136     @Override
137     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
138         LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
139         final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
140         return read(future, OPERATIONAL, path);
141     }
142
143     @Override
144     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
145             final List<YangInstanceIdentifier> fields) {
146         LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
147         final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
148         return read(future, OPERATIONAL, path);
149     }
150
151     @Override
152     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
153         LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
154         final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
155         return read(future, CONFIGURATION, path);
156     }
157
158     @Override
159     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
160             final List<YangInstanceIdentifier> fields) {
161         LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
162         final Future<Object> future = Patterns.ask(masterActor,
163                 new GetConfigWithFieldsRequest(path, fields), askTimeout);
164         return read(future, CONFIGURATION, path);
165     }
166
167     @Override
168     public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
169             final YangInstanceIdentifier path, final NormalizedNode data,
170             final Optional<ModifyAction> defaultOperation) {
171         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
172         masterActor.tell(new MergeEditConfigRequest(
173             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
174         return createResult();
175
176     }
177
178     @Override
179     public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
180             final YangInstanceIdentifier path, final NormalizedNode data,
181             final Optional<ModifyAction> defaultOperation) {
182         LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
183
184         masterActor.tell(new ReplaceEditConfigRequest(
185             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
186         return createResult();
187     }
188
189     @Override
190     public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
191             final YangInstanceIdentifier path, final NormalizedNode data,
192             final Optional<ModifyAction> defaultOperation) {
193         LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
194         masterActor.tell(new CreateEditConfigRequest(
195             store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
196         return createResult();
197     }
198
199     @Override
200     public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
201             final YangInstanceIdentifier path) {
202         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
203         masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
204         return createResult();
205     }
206
207     @Override
208     public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
209             final YangInstanceIdentifier path) {
210         LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
211         masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
212         return createResult();
213     }
214
215     @Override
216     public ListenableFuture<? extends DOMRpcResult> commit() {
217         LOG.debug("{}: Commit via actor {}", id, masterActor);
218
219         final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
220         final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
221         future.onComplete(new OnComplete<>() {
222             @Override
223             public void onComplete(final Throwable failure, final Object response) {
224                 if (failure != null) {
225                     LOG.debug("{}: Commit failed", id, failure);
226                     settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
227                 } else if (response instanceof InvokeRpcMessageReply) {
228                     LOG.debug("{}: Commit succeeded", id);
229                     settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
230                 } else {
231                     settableFuture.setException(
232                         new ClusteringRpcException("Commit operation returned unexpected type"));
233                     LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
234                 }
235             }
236
237             private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
238                 return new NetconfServiceFailedException(String.format("%s: Commit of operation failed",
239                     getDeviceId()), failure);
240             }
241         }, executionContext);
242         return settableFuture;
243     }
244
245     @Override
246     public Object getDeviceId() {
247         return id;
248     }
249
250     private SettableFuture<Optional<NormalizedNode>> read(final Future<Object> future, final LogicalDatastoreType store,
251                                                           final YangInstanceIdentifier path) {
252         final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
253         future.onComplete(new OnComplete<>() {
254             @Override
255             public void onComplete(final Throwable failure, final Object response) {
256                 if (failure != null) {
257                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
258
259                     final Throwable processedFailure = processFailure(failure);
260                     if (processedFailure instanceof ReadFailedException) {
261                         settableFuture.setException(processedFailure);
262                     } else {
263                         settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
264                             + " failed", processedFailure));
265                     }
266                     return;
267                 }
268
269                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
270
271                 if (response instanceof EmptyReadResponse) {
272                     settableFuture.set(Optional.empty());
273                     return;
274                 }
275
276                 if (response instanceof NormalizedNodeMessage) {
277                     final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
278                     settableFuture.set(Optional.of(data.getNode()));
279                 }
280             }
281         }, executionContext);
282
283         return settableFuture;
284     }
285
286     private Throwable processFailure(final Throwable failure) {
287         return failure instanceof AskTimeoutException
288             ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
289     }
290
291     // FIXME: this is being used in contexts where we should be waiting for a reply
292     private static ListenableFuture<? extends DOMRpcResult> createResult() {
293         return Futures.immediateFuture(new DefaultDOMRpcResult());
294     }
295
296     private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
297         if (reply.getNormalizedNodeMessage() == null) {
298             return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
299         } else {
300             return new DefaultDOMRpcResult(reply.getNormalizedNodeMessage().getNode(), reply.getRpcErrors());
301         }
302     }
303 }