2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.ReceiveTimeout;
13 import akka.actor.Status;
14 import akka.actor.UntypedAbstractActor;
15 import akka.util.JavaDurationConverters;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import java.time.Duration;
21 import java.util.Optional;
22 import java.util.function.Supplier;
23 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
24 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
25 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
26 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
38 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
39 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
40 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
41 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
48 private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
50 private final NetconfDataTreeService netconfService;
51 private final long idleTimeout;
53 private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
54 this.netconfService = netconfService;
55 this.idleTimeout = idleTimeout.toSeconds();
56 if (this.idleTimeout > 0) {
57 context().setReceiveTimeout(JavaDurationConverters.asFiniteDuration(idleTimeout));
61 static Props props(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
62 return Props.create(NetconfDataTreeServiceActor.class, () ->
63 new NetconfDataTreeServiceActor(netconfService, idleTimeout));
67 public void onReceive(final Object message) {
68 if (message instanceof GetWithFieldsRequest getRequest) {
69 final YangInstanceIdentifier path = getRequest.getPath();
70 final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(
71 getRequest.getPath(), getRequest.getFields());
72 context().stop(self());
73 sendResult(future, path, sender(), self());
74 } else if (message instanceof GetRequest getRequest) {
75 final YangInstanceIdentifier path = getRequest.getPath();
76 final ListenableFuture<Optional<NormalizedNode>> future = netconfService.get(path);
77 context().stop(self());
78 sendResult(future, path, sender(), self());
79 } else if (message instanceof GetConfigWithFieldsRequest getConfigRequest) {
80 final YangInstanceIdentifier path = getConfigRequest.getPath();
81 final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(
82 path, getConfigRequest.getFields());
83 context().stop(self());
84 sendResult(future, path, sender(), self());
85 } else if (message instanceof GetConfigRequest getConfigRequest) {
86 final YangInstanceIdentifier path = getConfigRequest.getPath();
87 final ListenableFuture<Optional<NormalizedNode>> future = netconfService.getConfig(path);
88 context().stop(self());
89 sendResult(future, path, sender(), self());
90 } else if (message instanceof LockRequest) {
91 invokeRpcCall(netconfService::lock, sender(), self());
92 } else if (message instanceof MergeEditConfigRequest request) {
95 request.getNormalizedNodeMessage().getIdentifier(),
96 request.getNormalizedNodeMessage().getNode(),
97 Optional.ofNullable(request.getDefaultOperation()));
98 } else if (message instanceof ReplaceEditConfigRequest request) {
99 netconfService.replace(
101 request.getNormalizedNodeMessage().getIdentifier(),
102 request.getNormalizedNodeMessage().getNode(),
103 Optional.ofNullable(request.getDefaultOperation()));
104 } else if (message instanceof CreateEditConfigRequest request) {
105 netconfService.create(
107 request.getNormalizedNodeMessage().getIdentifier(),
108 request.getNormalizedNodeMessage().getNode(),
109 Optional.ofNullable(request.getDefaultOperation()));
110 } else if (message instanceof DeleteEditConfigRequest request) {
111 netconfService.delete(request.getStore(), request.getPath());
112 } else if (message instanceof RemoveEditConfigRequest request) {
113 netconfService.remove(request.getStore(), request.getPath());
114 } else if (message instanceof CommitRequest) {
115 submit(sender(), self());
116 } else if (message instanceof DiscardChangesRequest) {
117 invokeRpcCall(netconfService::discardChanges, sender(), self());
118 } else if (message instanceof UnlockRequest) {
119 context().stop(self());
120 invokeRpcCall(netconfService::unlock, sender(), self());
121 } else if (message instanceof ReceiveTimeout) {
122 LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
124 invokeRpcCall(netconfService::discardChanges, sender(), self());
125 invokeRpcCall(netconfService::unlock, sender(), self());
126 context().stop(self());
132 private void submit(final ActorRef requester, final ActorRef self) {
133 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
135 public void onSuccess(final DOMRpcResult result) {
136 if (result == null) {
137 requester.tell(new EmptyResultResponse(), getSender());
140 NormalizedNodeMessage nodeMessageResp = null;
141 if (result.value() != null) {
142 nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.value());
144 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.errors()), self);
148 public void onFailure(final Throwable throwable) {
149 requester.tell(new Status.Failure(throwable), self);
151 }, MoreExecutors.directExecutor());
154 private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
155 final ActorRef requester, final ActorRef self) {
156 Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
158 public void onSuccess(final DOMRpcResult rpcResult) {
159 if (rpcResult == null) {
160 requester.tell(new EmptyResultResponse(), getSender());
163 NormalizedNodeMessage nodeMessageResp = null;
164 if (rpcResult.value() != null) {
165 nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.value());
167 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.errors()), self);
171 public void onFailure(final Throwable throwable) {
172 requester.tell(new Status.Failure(throwable), self);
174 }, MoreExecutors.directExecutor());
177 private static void sendResult(final ListenableFuture<Optional<NormalizedNode>> feature,
178 final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
179 Futures.addCallback(feature, new FutureCallback<>() {
181 public void onSuccess(final Optional<NormalizedNode> result) {
182 if (result.isEmpty()) {
183 sender.tell(new EmptyReadResponse(), self);
186 sender.tell(new NormalizedNodeMessage(path, result.get()), self);
190 public void onFailure(final Throwable throwable) {
191 sender.tell(new Status.Failure(throwable), self);
193 }, MoreExecutors.directExecutor());