2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
29 import org.opendaylight.netconf.api.EffectiveOperation;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
32 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
33 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
57 public class ActorProxyNetconfServiceFacade implements ProxyNetconfServiceFacade {
58 private static final Logger LOG = LoggerFactory.getLogger(ActorProxyNetconfServiceFacade.class);
60 private final ActorRef masterActor;
61 private final RemoteDeviceId id;
62 private final ExecutionContext executionContext;
63 private final Timeout askTimeout;
65 public ActorProxyNetconfServiceFacade(final ActorRef masterActor, final RemoteDeviceId id,
66 final ExecutionContext executionContext, final Timeout askTimeout) {
67 this.masterActor = Objects.requireNonNull(masterActor);
68 this.id = Objects.requireNonNull(id);
69 this.executionContext = Objects.requireNonNull(executionContext);
70 this.askTimeout = Objects.requireNonNull(askTimeout);
74 public ListenableFuture<DOMRpcResult> lock() {
75 LOG.debug("{}: Lock via actor {}", id, masterActor);
76 final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
77 final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
78 future.onComplete(new OnComplete<>() {
80 public void onComplete(final Throwable failure, final Object response) {
81 if (failure != null) {
82 lockResult.setException(failure);
83 } else if (response instanceof InvokeRpcMessageReply) {
84 lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
86 lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
87 LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
95 public ListenableFuture<DOMRpcResult> unlock() {
96 LOG.debug("{}: Unlock via actor {}", id, masterActor);
97 final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
98 final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
99 future.onComplete(new OnComplete<>() {
101 public void onComplete(final Throwable failure, final Object response) {
102 if (failure != null) {
103 unlockResult.setException(failure);
104 } else if (response instanceof InvokeRpcMessageReply) {
105 unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
107 unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
108 LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
111 }, executionContext);
116 public ListenableFuture<DOMRpcResult> discardChanges() {
117 LOG.debug("{}: Discard changes via actor {}", id, masterActor);
118 final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
119 final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
120 future.onComplete(new OnComplete<>() {
122 public void onComplete(final Throwable failure, final Object response) {
123 if (failure != null) {
124 discardChangesResult.setException(failure);
125 } else if (response instanceof InvokeRpcMessageReply) {
126 discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
128 discardChangesResult.setException(
129 new ClusteringRpcException("Discard changes operation returned unexpected type"));
130 LOG.error("{}: Discard changes via actor {} returned unexpected type", id, masterActor);
133 }, executionContext);
134 return discardChangesResult;
138 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
139 LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
140 final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
141 return read(future, OPERATIONAL, path);
145 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
146 final List<YangInstanceIdentifier> fields) {
147 LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
148 final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
149 return read(future, OPERATIONAL, path);
153 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
154 LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
155 final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
156 return read(future, CONFIGURATION, path);
160 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
161 final List<YangInstanceIdentifier> fields) {
162 LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
163 final Future<Object> future = Patterns.ask(masterActor,
164 new GetConfigWithFieldsRequest(path, fields), askTimeout);
165 return read(future, CONFIGURATION, path);
169 public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
170 final YangInstanceIdentifier path, final NormalizedNode data,
171 final Optional<EffectiveOperation> defaultOperation) {
172 LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
173 masterActor.tell(new MergeEditConfigRequest(
174 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
175 return createResult();
180 public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
181 final YangInstanceIdentifier path, final NormalizedNode data,
182 final Optional<EffectiveOperation> defaultOperation) {
183 LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
185 masterActor.tell(new ReplaceEditConfigRequest(
186 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
187 return createResult();
191 public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
192 final YangInstanceIdentifier path, final NormalizedNode data,
193 final Optional<EffectiveOperation> defaultOperation) {
194 LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
195 masterActor.tell(new CreateEditConfigRequest(
196 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
197 return createResult();
201 public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
202 final YangInstanceIdentifier path) {
203 LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
204 masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
205 return createResult();
209 public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
210 final YangInstanceIdentifier path) {
211 LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
212 masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
213 return createResult();
217 public ListenableFuture<? extends DOMRpcResult> commit() {
218 LOG.debug("{}: Commit via actor {}", id, masterActor);
220 final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
221 final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
222 future.onComplete(new OnComplete<>() {
224 public void onComplete(final Throwable failure, final Object response) {
225 if (failure != null) {
226 LOG.debug("{}: Commit failed", id, failure);
227 settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
228 } else if (response instanceof InvokeRpcMessageReply) {
229 LOG.debug("{}: Commit succeeded", id);
230 settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
232 settableFuture.setException(
233 new ClusteringRpcException("Commit operation returned unexpected type"));
234 LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
238 private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
239 return new NetconfServiceFailedException(String.format("%s: Commit of operation failed",
240 getDeviceId()), failure);
242 }, executionContext);
243 return settableFuture;
247 public Object getDeviceId() {
251 private SettableFuture<Optional<NormalizedNode>> read(final Future<Object> future, final LogicalDatastoreType store,
252 final YangInstanceIdentifier path) {
253 final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
254 future.onComplete(new OnComplete<>() {
256 public void onComplete(final Throwable failure, final Object response) {
257 if (failure != null) {
258 LOG.debug("{}: Read {} {} failed", id, store, path, failure);
260 final Throwable processedFailure = processFailure(failure);
261 if (processedFailure instanceof ReadFailedException) {
262 settableFuture.setException(processedFailure);
264 settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
265 + " failed", processedFailure));
270 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
272 if (response instanceof EmptyReadResponse) {
273 settableFuture.set(Optional.empty());
277 if (response instanceof NormalizedNodeMessage data) {
278 settableFuture.set(Optional.of(data.getNode()));
281 }, executionContext);
283 return settableFuture;
286 private Throwable processFailure(final Throwable failure) {
287 return failure instanceof AskTimeoutException
288 ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
291 // FIXME: this is being used in contexts where we should be waiting for a reply
292 private static ListenableFuture<? extends DOMRpcResult> createResult() {
293 return Futures.immediateFuture(new DefaultDOMRpcResult());
296 private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
297 if (reply.getNormalizedNodeMessage() == null) {
298 return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
300 return new DefaultDOMRpcResult((ContainerNode) reply.getNormalizedNodeMessage().getNode(),
301 reply.getRpcErrors());