2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.ReceiveTimeout;
13 import akka.actor.Status;
14 import akka.actor.UntypedAbstractActor;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.Optional;
20 import java.util.function.Supplier;
21 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
22 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
23 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
24 import org.opendaylight.netconf.topology.singleton.messages.netconf.CommitRequest;
25 import org.opendaylight.netconf.topology.singleton.messages.netconf.CreateEditConfigRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.netconf.DeleteEditConfigRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.netconf.DiscardChangesRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetConfigWithFieldsRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.netconf.GetWithFieldsRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.netconf.LockRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.netconf.MergeEditConfigRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.netconf.RemoveEditConfigRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.netconf.ReplaceEditConfigRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.netconf.UnlockRequest;
37 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
38 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
39 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import scala.concurrent.duration.Duration;
46 public final class NetconfDataTreeServiceActor extends UntypedAbstractActor {
47 private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceActor.class);
49 private final NetconfDataTreeService netconfService;
50 private final long idleTimeout;
52 private NetconfDataTreeServiceActor(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
53 this.netconfService = netconfService;
54 this.idleTimeout = idleTimeout.toSeconds();
55 if (this.idleTimeout > 0) {
56 context().setReceiveTimeout(idleTimeout);
60 static Props props(final NetconfDataTreeService netconfService, final Duration idleTimeout) {
61 return Props.create(NetconfDataTreeServiceActor.class, () ->
62 new NetconfDataTreeServiceActor(netconfService, idleTimeout));
66 public void onReceive(final Object message) {
67 if (message instanceof GetWithFieldsRequest) {
68 final GetWithFieldsRequest getRequest = (GetWithFieldsRequest) message;
69 final YangInstanceIdentifier path = getRequest.getPath();
70 final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(
71 getRequest.getPath(), getRequest.getFields());
72 context().stop(self());
73 sendResult(future, path, sender(), self());
74 } else if (message instanceof GetRequest) {
75 final GetRequest getRequest = (GetRequest) message;
76 final YangInstanceIdentifier path = getRequest.getPath();
77 final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.get(path);
78 context().stop(self());
79 sendResult(future, path, sender(), self());
80 } else if (message instanceof GetConfigWithFieldsRequest) {
81 final GetConfigWithFieldsRequest getConfigRequest = (GetConfigWithFieldsRequest) message;
82 final YangInstanceIdentifier path = getConfigRequest.getPath();
83 final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(
84 path, getConfigRequest.getFields());
85 context().stop(self());
86 sendResult(future, path, sender(), self());
87 } else if (message instanceof GetConfigRequest) {
88 final GetConfigRequest getConfigRequest = (GetConfigRequest) message;
89 final YangInstanceIdentifier path = getConfigRequest.getPath();
90 final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = netconfService.getConfig(path);
91 context().stop(self());
92 sendResult(future, path, sender(), self());
93 } else if (message instanceof LockRequest) {
94 invokeRpcCall(netconfService::lock, sender(), self());
95 } else if (message instanceof MergeEditConfigRequest) {
96 final MergeEditConfigRequest request = (MergeEditConfigRequest) message;
99 request.getNormalizedNodeMessage().getIdentifier(),
100 request.getNormalizedNodeMessage().getNode(),
101 Optional.ofNullable(request.getDefaultOperation()));
102 } else if (message instanceof ReplaceEditConfigRequest) {
103 final ReplaceEditConfigRequest request = (ReplaceEditConfigRequest) message;
104 netconfService.replace(
106 request.getNormalizedNodeMessage().getIdentifier(),
107 request.getNormalizedNodeMessage().getNode(),
108 Optional.ofNullable(request.getDefaultOperation()));
109 } else if (message instanceof CreateEditConfigRequest) {
110 final CreateEditConfigRequest request = (CreateEditConfigRequest) message;
111 netconfService.create(
113 request.getNormalizedNodeMessage().getIdentifier(),
114 request.getNormalizedNodeMessage().getNode(),
115 Optional.ofNullable(request.getDefaultOperation()));
116 } else if (message instanceof DeleteEditConfigRequest) {
117 final DeleteEditConfigRequest request = (DeleteEditConfigRequest) message;
118 netconfService.delete(request.getStore(), request.getPath());
119 } else if (message instanceof RemoveEditConfigRequest) {
120 final RemoveEditConfigRequest request = (RemoveEditConfigRequest) message;
121 netconfService.remove(request.getStore(), request.getPath());
122 } else if (message instanceof CommitRequest) {
123 submit(sender(), self());
124 } else if (message instanceof DiscardChangesRequest) {
125 invokeRpcCall(netconfService::discardChanges, sender(), self());
126 } else if (message instanceof UnlockRequest) {
127 context().stop(self());
128 invokeRpcCall(netconfService::unlock, sender(), self());
129 } else if (message instanceof ReceiveTimeout) {
130 LOG.warn("Haven't received any message for {} seconds, cancelling transaction and stopping actor",
132 invokeRpcCall(netconfService::discardChanges, sender(), self());
133 invokeRpcCall(netconfService::unlock, sender(), self());
134 context().stop(self());
140 private void submit(final ActorRef requester, final ActorRef self) {
141 Futures.addCallback(netconfService.commit(), new FutureCallback<DOMRpcResult>() {
143 public void onSuccess(final DOMRpcResult result) {
144 if (result == null) {
145 requester.tell(new EmptyResultResponse(), getSender());
148 NormalizedNodeMessage nodeMessageResp = null;
149 if (result.getResult() != null) {
150 nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), result.getResult());
152 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, result.getErrors()), self);
156 public void onFailure(final Throwable throwable) {
157 requester.tell(new Status.Failure(throwable), self);
159 }, MoreExecutors.directExecutor());
162 private void invokeRpcCall(final Supplier<ListenableFuture<? extends DOMRpcResult>> operation,
163 final ActorRef requester, final ActorRef self) {
164 Futures.addCallback(operation.get(), new FutureCallback<DOMRpcResult>() {
166 public void onSuccess(final DOMRpcResult rpcResult) {
167 if (rpcResult == null) {
168 requester.tell(new EmptyResultResponse(), getSender());
171 NormalizedNodeMessage nodeMessageResp = null;
172 if (rpcResult.getResult() != null) {
173 nodeMessageResp = new NormalizedNodeMessage(YangInstanceIdentifier.empty(), rpcResult.getResult());
175 requester.tell(new InvokeRpcMessageReply(nodeMessageResp, rpcResult.getErrors()), self);
179 public void onFailure(final Throwable throwable) {
180 requester.tell(new Status.Failure(throwable), self);
182 }, MoreExecutors.directExecutor());
185 private static void sendResult(final ListenableFuture<Optional<NormalizedNode<?, ?>>> feature,
186 final YangInstanceIdentifier path, final ActorRef sender, final ActorRef self) {
187 Futures.addCallback(feature, new FutureCallback<>() {
189 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
190 if (result.isEmpty()) {
191 sender.tell(new EmptyReadResponse(), self);
194 sender.tell(new NormalizedNodeMessage(path, result.get()), self);
198 public void onFailure(final Throwable throwable) {
199 sender.tell(new Status.Failure(throwable), self);
201 }, MoreExecutors.directExecutor());