2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
29 import org.opendaylight.netconf.api.ModifyAction;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
32 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
33 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
40 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
41 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
42 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
43 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
44 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
45 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
46 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
47 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
48 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.ExecutionContext;
54 import scala.concurrent.Future;
56 public class ActorProxyNetconfServiceFacade implements ProxyNetconfServiceFacade {
57 private static final Logger LOG = LoggerFactory.getLogger(ActorProxyNetconfServiceFacade.class);
59 private final ActorRef masterActor;
60 private final RemoteDeviceId id;
61 private final ExecutionContext executionContext;
62 private final Timeout askTimeout;
64 public ActorProxyNetconfServiceFacade(final ActorRef masterActor, final RemoteDeviceId id,
65 final ExecutionContext executionContext, final Timeout askTimeout) {
66 this.masterActor = Objects.requireNonNull(masterActor);
67 this.id = Objects.requireNonNull(id);
68 this.executionContext = Objects.requireNonNull(executionContext);
69 this.askTimeout = Objects.requireNonNull(askTimeout);
73 public ListenableFuture<DOMRpcResult> lock() {
74 LOG.debug("{}: Lock via actor {}", id, masterActor);
75 final SettableFuture<DOMRpcResult> lockResult = SettableFuture.create();
76 final Future<Object> future = Patterns.ask(masterActor, new LockRequest(), askTimeout);
77 future.onComplete(new OnComplete<>() {
79 public void onComplete(final Throwable failure, final Object response) {
80 if (failure != null) {
81 lockResult.setException(failure);
82 } else if (response instanceof InvokeRpcMessageReply) {
83 lockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
85 lockResult.setException(new ClusteringRpcException("Lock operation returned unexpected type"));
86 LOG.error("{}: Lock via actor {} returned unexpected type", id, masterActor);
94 public ListenableFuture<DOMRpcResult> unlock() {
95 LOG.debug("{}: Unlock via actor {}", id, masterActor);
96 final SettableFuture<DOMRpcResult> unlockResult = SettableFuture.create();
97 final Future<Object> future = Patterns.ask(masterActor, new UnlockRequest(), askTimeout);
98 future.onComplete(new OnComplete<>() {
100 public void onComplete(final Throwable failure, final Object response) {
101 if (failure != null) {
102 unlockResult.setException(failure);
103 } else if (response instanceof InvokeRpcMessageReply) {
104 unlockResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
106 unlockResult.setException(new ClusteringRpcException("Unlock operation returned unexpected type"));
107 LOG.error("{}: Unlock via actor {} returned unexpected type", id, masterActor);
110 }, executionContext);
115 public ListenableFuture<DOMRpcResult> discardChanges() {
116 LOG.debug("{}: Discard changes via actor {}", id, masterActor);
117 final SettableFuture<DOMRpcResult> discardChangesResult = SettableFuture.create();
118 final Future<Object> future = Patterns.ask(masterActor, new DiscardChangesRequest(), askTimeout);
119 future.onComplete(new OnComplete<>() {
121 public void onComplete(final Throwable failure, final Object response) {
122 if (failure != null) {
123 discardChangesResult.setException(failure);
124 } else if (response instanceof InvokeRpcMessageReply) {
125 discardChangesResult.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
127 discardChangesResult.setException(
128 new ClusteringRpcException("Discard changes operation returned unexpected type"));
129 LOG.error("{}: Discard changes via actor {} returned unexpected type", id, masterActor);
132 }, executionContext);
133 return discardChangesResult;
137 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
138 LOG.debug("{}: Get {} {} via actor {}", id, OPERATIONAL, path, masterActor);
139 final Future<Object> future = Patterns.ask(masterActor, new GetRequest(path), askTimeout);
140 return read(future, OPERATIONAL, path);
144 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
145 final List<YangInstanceIdentifier> fields) {
146 LOG.debug("{}: Get {} {} with fields {} via actor {}", id, OPERATIONAL, path, fields, masterActor);
147 final Future<Object> future = Patterns.ask(masterActor, new GetWithFieldsRequest(path, fields), askTimeout);
148 return read(future, OPERATIONAL, path);
152 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
153 LOG.debug("{}: GetConfig {} {} via actor {}", id, CONFIGURATION, path, masterActor);
154 final Future<Object> future = Patterns.ask(masterActor, new GetConfigRequest(path), askTimeout);
155 return read(future, CONFIGURATION, path);
159 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
160 final List<YangInstanceIdentifier> fields) {
161 LOG.debug("{}: GetConfig {} {} with fields {} via actor {}", id, CONFIGURATION, path, fields, masterActor);
162 final Future<Object> future = Patterns.ask(masterActor,
163 new GetConfigWithFieldsRequest(path, fields), askTimeout);
164 return read(future, CONFIGURATION, path);
168 public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
169 final YangInstanceIdentifier path, final NormalizedNode data,
170 final Optional<ModifyAction> defaultOperation) {
171 LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterActor);
172 masterActor.tell(new MergeEditConfigRequest(
173 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
174 return createResult();
179 public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
180 final YangInstanceIdentifier path, final NormalizedNode data,
181 final Optional<ModifyAction> defaultOperation) {
182 LOG.debug("{}: Replace {} {} via actor {}", id, store, path, masterActor);
184 masterActor.tell(new ReplaceEditConfigRequest(
185 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
186 return createResult();
190 public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
191 final YangInstanceIdentifier path, final NormalizedNode data,
192 final Optional<ModifyAction> defaultOperation) {
193 LOG.debug("{}: Create {} {} via actor {}", id, store, path, masterActor);
194 masterActor.tell(new CreateEditConfigRequest(
195 store, new NormalizedNodeMessage(path, data), defaultOperation.orElse(null)), ActorRef.noSender());
196 return createResult();
200 public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
201 final YangInstanceIdentifier path) {
202 LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterActor);
203 masterActor.tell(new DeleteEditConfigRequest(store, path), ActorRef.noSender());
204 return createResult();
208 public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
209 final YangInstanceIdentifier path) {
210 LOG.debug("{}: Remove {} {} via actor {}", id, store, path, masterActor);
211 masterActor.tell(new RemoveEditConfigRequest(store, path), ActorRef.noSender());
212 return createResult();
216 public ListenableFuture<? extends DOMRpcResult> commit() {
217 LOG.debug("{}: Commit via actor {}", id, masterActor);
219 final Future<Object> future = Patterns.ask(masterActor, new CommitRequest(), askTimeout);
220 final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
221 future.onComplete(new OnComplete<>() {
223 public void onComplete(final Throwable failure, final Object response) {
224 if (failure != null) {
225 LOG.debug("{}: Commit failed", id, failure);
226 settableFuture.setException(newNetconfServiceFailedException(processFailure(failure)));
227 } else if (response instanceof InvokeRpcMessageReply) {
228 LOG.debug("{}: Commit succeeded", id);
229 settableFuture.set(mapInvokeRpcMessageReplyToDOMRpcResult((InvokeRpcMessageReply) response));
231 settableFuture.setException(
232 new ClusteringRpcException("Commit operation returned unexpected type"));
233 LOG.error("{}: Commit via actor {} returned unexpected type", id, masterActor);
237 private NetconfServiceFailedException newNetconfServiceFailedException(final Throwable failure) {
238 return new NetconfServiceFailedException(String.format("%s: Commit of operation failed",
239 getDeviceId()), failure);
241 }, executionContext);
242 return settableFuture;
246 public Object getDeviceId() {
250 private SettableFuture<Optional<NormalizedNode>> read(final Future<Object> future, final LogicalDatastoreType store,
251 final YangInstanceIdentifier path) {
252 final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
253 future.onComplete(new OnComplete<>() {
255 public void onComplete(final Throwable failure, final Object response) {
256 if (failure != null) {
257 LOG.debug("{}: Read {} {} failed", id, store, path, failure);
259 final Throwable processedFailure = processFailure(failure);
260 if (processedFailure instanceof ReadFailedException) {
261 settableFuture.setException(processedFailure);
263 settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
264 + " failed", processedFailure));
269 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
271 if (response instanceof EmptyReadResponse) {
272 settableFuture.set(Optional.empty());
276 if (response instanceof NormalizedNodeMessage) {
277 final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
278 settableFuture.set(Optional.of(data.getNode()));
281 }, executionContext);
283 return settableFuture;
286 private Throwable processFailure(final Throwable failure) {
287 return failure instanceof AskTimeoutException
288 ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception) failure) : failure;
291 // FIXME: this is being used in contexts where we should be waiting for a reply
292 private static ListenableFuture<? extends DOMRpcResult> createResult() {
293 return Futures.immediateFuture(new DefaultDOMRpcResult());
296 private static DOMRpcResult mapInvokeRpcMessageReplyToDOMRpcResult(final InvokeRpcMessageReply reply) {
297 if (reply.getNormalizedNodeMessage() == null) {
298 return new DefaultDOMRpcResult(new ArrayList<>(reply.getRpcErrors()));
300 return new DefaultDOMRpcResult(reply.getNormalizedNodeMessage().getNode(), reply.getRpcErrors());