Bump upstreams
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
28 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
29 import org.opendaylight.mdsal.dom.api.DOMActionResult;
30 import org.opendaylight.mdsal.dom.api.DOMActionService;
31 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
38 import org.opendaylight.mdsal.dom.api.DOMRpcService;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
43 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
44 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
45 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
46 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
47 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
49 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
50 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
51 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
53 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
54 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
55 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
56 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
57 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
58 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
59 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
60 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
61 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
62 import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
64 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
65 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
66 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
67 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
68 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
69 import org.opendaylight.yangtools.concepts.Registration;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
73 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
74 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
75 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
76 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
77 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
78 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
79 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
80 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
81
82 public class NetconfNodeActor extends AbstractUntypedActor {
83     private final Duration writeTxIdleTimeout;
84     private final DOMMountPointService mountPointService;
85
86     private SchemaSourceRegistry schemaRegistry;
87     private SchemaRepository schemaRepository;
88     private Timeout actorResponseWaitTime;
89     private RemoteDeviceId id;
90     private NetconfTopologySetup setup;
91     private List<SourceIdentifier> sourceIdentifiers = null;
92     private DOMRpcService deviceRpc = null;
93     private DOMActionService deviceAction = null;
94     private SlaveSalFacade slaveSalManager;
95     private DOMDataBroker deviceDataBroker;
96     private NetconfDataTreeService netconfService;
97     //readTxActor can be shared
98     private ActorRef readTxActor;
99     private List<Registration> registeredSchemas;
100
101     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
102             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
103         return Props.create(NetconfNodeActor.class, () ->
104                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
105     }
106
107     protected NetconfNodeActor(final NetconfTopologySetup setup,
108                                final RemoteDeviceId id, final Timeout actorResponseWaitTime,
109                                final DOMMountPointService mountPointService) {
110         this.setup = setup;
111         this.id = id;
112         schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
113         schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
114         this.actorResponseWaitTime = actorResponseWaitTime;
115         writeTxIdleTimeout = setup.getIdleTimeout();
116         this.mountPointService = mountPointService;
117     }
118
119     @SuppressWarnings("checkstyle:IllegalCatch")
120     @Override
121     public void handleReceive(final Object message) {
122         LOG.debug("{}:  received message {}", id, message);
123
124         if (message instanceof CreateInitialMasterActorData masterActorData) { // master
125             sourceIdentifiers = masterActorData.getSourceIndentifiers();
126             deviceDataBroker = masterActorData.getDeviceDataBroker();
127             netconfService = masterActorData.getNetconfDataTreeService();
128             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
129             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
130
131             final var deviceServices = masterActorData.getDeviceServices();
132             deviceRpc = deviceServices.rpcs() instanceof Rpcs.Normalized normalized ? normalized.domRpcService() : null;
133             deviceAction = deviceServices.actions() instanceof Actions.Normalized normalized ? normalized : null;
134
135             sender().tell(new MasterActorDataInitialized(), self());
136             LOG.debug("{}: Master is ready.", id);
137         } else if (message instanceof RefreshSetupMasterActorData) {
138             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
139             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
140             sender().tell(new MasterActorDataInitialized(), self());
141         } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
142             // only master contains reference to deviceDataBroker
143             if (deviceDataBroker != null) {
144                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
145                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
146                     sender());
147             } else {
148                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
149                 sender().tell(new Failure(new NotMasterException(self())), self());
150             }
151         } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
152             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
153         } else if (message instanceof NewReadTransactionRequest) { // master
154             sender().tell(new Success(readTxActor), self());
155         } else if (message instanceof NewWriteTransactionRequest) { // master
156             try {
157                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
158                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
159                 sender().tell(new Success(txActor), self());
160             } catch (final Exception t) {
161                 sender().tell(new Failure(t), self());
162             }
163         } else if (message instanceof NewReadWriteTransactionRequest) {
164             try {
165                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
166                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
167                 sender().tell(new Success(txActor), self());
168             } catch (final Exception t) {
169                 sender().tell(new Failure(t), self());
170             }
171         } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
172             invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
173                 invokeRpcMessage.getNormalizedNodeMessage(), sender());
174         } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
175             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
176             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
177                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
178         } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
179             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
180             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
181             sender().tell(new Success(null), self());
182         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
183             unregisterSlaveMountPoint();
184         } else if (message instanceof RefreshSlaveActor refreshSlave) { //slave
185             actorResponseWaitTime = refreshSlave.getActorResponseWaitTime();
186             id = refreshSlave.getId();
187             schemaRegistry = refreshSlave.getSchemaRegistry();
188             setup = refreshSlave.getSetup();
189             schemaRepository = refreshSlave.getSchemaRepository();
190         } else if (message instanceof NetconfDataTreeServiceRequest) {
191             ActorRef netconfActor = context()
192                 .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout));
193             sender().tell(new Success(netconfActor), self());
194         }
195     }
196
197     @Override
198     public void postStop() throws Exception {
199         try {
200             super.postStop();
201         } finally {
202             unregisterSlaveMountPoint();
203         }
204     }
205
206     private void unregisterSlaveMountPoint() {
207         if (slaveSalManager != null) {
208             slaveSalManager.close();
209             slaveSalManager = null;
210         }
211
212         closeSchemaSourceRegistrations();
213     }
214
215     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
216         Futures.addCallback(schemaRepository.getSchemaSource(sourceIdentifier, YangTextSource.class),
217             new FutureCallback<>() {
218                 @Override
219                 public void onSuccess(final YangTextSource yangTextSchemaSource) {
220                     try {
221                         LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
222                         sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
223                     } catch (IOException e) {
224                         sender.tell(new Failure(e), getSelf());
225                     }
226                 }
227
228                 @Override
229                 public void onFailure(final Throwable throwable) {
230                     LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
231                     sender.tell(new Failure(throwable), getSelf());
232                 }
233             }, MoreExecutors.directExecutor());
234     }
235
236     private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
237                                 final ActorRef recipient) {
238         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
239                 deviceRpc);
240
241         final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
242                 normalizedNodeMessage != null ? (ContainerNode) normalizedNodeMessage.getNode() : null);
243
244         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
245             @Override
246             public void onSuccess(final DOMRpcResult domRpcResult) {
247                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
248
249                 if (domRpcResult == null) {
250                     recipient.tell(new EmptyResultResponse(), getSender());
251                     return;
252                 }
253                 NormalizedNodeMessage nodeMessageReply = null;
254                 if (domRpcResult.value() != null) {
255                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.of(), domRpcResult.value());
256                 }
257                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.errors()), getSelf());
258             }
259
260             @Override
261             public void onFailure(final Throwable throwable) {
262                 recipient.tell(new Failure(throwable), getSelf());
263             }
264         }, MoreExecutors.directExecutor());
265     }
266
267     /**
268      * Invoking Action on Slave Node in Odl Cluster Environment.
269      *
270      * @param schemaPath {@link Absolute}
271      * @param containerNodeMessage {@link ContainerNodeMessage}
272      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
273      * @param recipient {@link ActorRef}
274      */
275     private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
276             final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
277         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
278             containerNodeMessage, domDataTreeIdentifier, deviceAction);
279
280         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
281             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
282
283         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
284
285             @Override
286             public void onSuccess(final DOMActionResult domActionResult) {
287                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
288                 if (domActionResult == null) {
289                     recipient.tell(new EmptyResultResponse(), getSender());
290                     return;
291                 }
292
293                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
294                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
295                         .orElse(null);
296                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
297             }
298
299             @Override
300             public void onFailure(final Throwable throwable) {
301                 recipient.tell(new Failure(throwable), getSelf());
302             }
303         }, MoreExecutors.directExecutor());
304     }
305
306     private void registerSlaveMountPoint(final ActorRef masterReference) {
307         unregisterSlaveMountPoint();
308
309         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
310
311         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
312     }
313
314     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
315         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
316                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
317         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
318                 getContext().dispatcher());
319
320         registeredSchemas = sourceIdentifiers.stream()
321                 .map(sourceId ->
322                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
323                                 YangTextSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
324                 .collect(Collectors.toList());
325
326         return schemaRepository.createEffectiveModelContextFactory();
327     }
328
329     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
330             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
331         final var schemaContextFuture = schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
332         Futures.addCallback(schemaContextFuture, new FutureCallback<>() {
333             @Override
334             public void onSuccess(final EffectiveModelContext result) {
335                 executeInSelf(() -> {
336                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
337                     // resolution.
338                     if (slaveSalManager == localSlaveSalManager) {
339                         LOG.info("{}: Schema context resolved: {} - registering slave mount point", id,
340                             result.getModules());
341                         final var actorSystem = setup.getActorSystem();
342                         final var rpcProxy = new ProxyDOMRpcService(actorSystem, masterReference, id,
343                             actorResponseWaitTime);
344                         slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
345                             (Rpcs.Normalized) () -> rpcProxy,
346                             new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
347                     }
348                 });
349             }
350
351             @Override
352             public void onFailure(final Throwable throwable) {
353                 executeInSelf(() -> {
354                     if (slaveSalManager == localSlaveSalManager) {
355                         final Throwable cause = Throwables.getRootCause(throwable);
356                         if (cause instanceof AskTimeoutException) {
357                             if (tries <= 5 || tries % 10 == 0) {
358                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
359                             }
360
361                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
362                                     masterReference, tries + 1);
363                         } else {
364                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
365                                     id, throwable);
366                             closeSchemaSourceRegistrations();
367                         }
368                     }
369                 });
370             }
371         }, MoreExecutors.directExecutor());
372     }
373
374     private void closeSchemaSourceRegistrations() {
375         if (registeredSchemas != null) {
376             registeredSchemas.forEach(Registration::close);
377             registeredSchemas = null;
378         }
379     }
380 }