Add RemoteDeviceServices
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
28 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
29 import org.opendaylight.mdsal.dom.api.DOMActionResult;
30 import org.opendaylight.mdsal.dom.api.DOMActionService;
31 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
38 import org.opendaylight.mdsal.dom.api.DOMRpcService;
39 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
40 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
41 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
42 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
43 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
44 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
45 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
46 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
47 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
48 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
49 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
50 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
51 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
52 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
53 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
54 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
55 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
56 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
57 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
58 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
59 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
60 import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
61 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
62 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
63 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
64 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
65 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
66 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
70 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
71 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
72 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
73 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
74 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
75 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
76 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
77 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
78
79 public class NetconfNodeActor extends AbstractUntypedActor {
80     private final Duration writeTxIdleTimeout;
81     private final DOMMountPointService mountPointService;
82
83     private SchemaSourceRegistry schemaRegistry;
84     private SchemaRepository schemaRepository;
85     private Timeout actorResponseWaitTime;
86     private RemoteDeviceId id;
87     private NetconfTopologySetup setup;
88     private List<SourceIdentifier> sourceIdentifiers;
89     private DOMRpcService deviceRpc;
90     private DOMActionService deviceAction;
91     private SlaveSalFacade slaveSalManager;
92     private DOMDataBroker deviceDataBroker;
93     private NetconfDataTreeService netconfService;
94     //readTxActor can be shared
95     private ActorRef readTxActor;
96     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
97
98     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
99             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
100         return Props.create(NetconfNodeActor.class, () ->
101                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
102     }
103
104     protected NetconfNodeActor(final NetconfTopologySetup setup,
105                                final RemoteDeviceId id, final Timeout actorResponseWaitTime,
106                                final DOMMountPointService mountPointService) {
107         this.setup = setup;
108         this.id = id;
109         schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
110         schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
111         this.actorResponseWaitTime = actorResponseWaitTime;
112         writeTxIdleTimeout = setup.getIdleTimeout();
113         this.mountPointService = mountPointService;
114     }
115
116     @SuppressWarnings("checkstyle:IllegalCatch")
117     @Override
118     public void handleReceive(final Object message) {
119         LOG.debug("{}:  received message {}", id, message);
120
121         if (message instanceof CreateInitialMasterActorData masterActorData) { // master
122             sourceIdentifiers = masterActorData.getSourceIndentifiers();
123             deviceDataBroker = masterActorData.getDeviceDataBroker();
124             netconfService = masterActorData.getNetconfDataTreeService();
125             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
126             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
127             deviceRpc = masterActorData.getDeviceRpc();
128             deviceAction = masterActorData.getDeviceAction();
129
130             sender().tell(new MasterActorDataInitialized(), self());
131             LOG.debug("{}: Master is ready.", id);
132         } else if (message instanceof RefreshSetupMasterActorData) {
133             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
134             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
135             sender().tell(new MasterActorDataInitialized(), self());
136         } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
137             // only master contains reference to deviceDataBroker
138             if (deviceDataBroker != null) {
139                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
140                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
141                     sender());
142             } else {
143                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
144                 sender().tell(new Failure(new NotMasterException(self())), self());
145             }
146         } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
147             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
148         } else if (message instanceof NewReadTransactionRequest) { // master
149             sender().tell(new Success(readTxActor), self());
150         } else if (message instanceof NewWriteTransactionRequest) { // master
151             try {
152                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
153                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
154                 sender().tell(new Success(txActor), self());
155             } catch (final Exception t) {
156                 sender().tell(new Failure(t), self());
157             }
158         } else if (message instanceof NewReadWriteTransactionRequest) {
159             try {
160                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
161                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
162                 sender().tell(new Success(txActor), self());
163             } catch (final Exception t) {
164                 sender().tell(new Failure(t), self());
165             }
166         } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
167             invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
168                 invokeRpcMessage.getNormalizedNodeMessage(), sender());
169         } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
170             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
171             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
172                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
173         } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
174             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
175             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
176             sender().tell(new Success(null), self());
177         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
178             unregisterSlaveMountPoint();
179         } else if (message instanceof RefreshSlaveActor) { //slave
180             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
181             id = ((RefreshSlaveActor) message).getId();
182             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
183             setup = ((RefreshSlaveActor) message).getSetup();
184             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
185         } else if (message instanceof NetconfDataTreeServiceRequest) {
186             ActorRef netconfActor = context()
187                 .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout));
188             sender().tell(new Success(netconfActor), self());
189         }
190     }
191
192     @Override
193     public void postStop() throws Exception {
194         try {
195             super.postStop();
196         } finally {
197             unregisterSlaveMountPoint();
198         }
199     }
200
201     private void unregisterSlaveMountPoint() {
202         if (slaveSalManager != null) {
203             slaveSalManager.close();
204             slaveSalManager = null;
205         }
206
207         closeSchemaSourceRegistrations();
208     }
209
210     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
211         final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
212                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
213
214         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
215             @Override
216             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
217                 try {
218                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
219                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
220                 } catch (IOException e) {
221                     sender.tell(new Failure(e), getSelf());
222                 }
223             }
224
225             @Override
226             public void onFailure(final Throwable throwable) {
227                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
228                 sender.tell(new Failure(throwable), getSelf());
229             }
230         }, MoreExecutors.directExecutor());
231     }
232
233     private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
234                                 final ActorRef recipient) {
235
236         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
237                 deviceRpc);
238
239         final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
240                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
241
242         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
243             @Override
244             public void onSuccess(final DOMRpcResult domRpcResult) {
245                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
246
247                 if (domRpcResult == null) {
248                     recipient.tell(new EmptyResultResponse(), getSender());
249                     return;
250                 }
251                 NormalizedNodeMessage nodeMessageReply = null;
252                 if (domRpcResult.getResult() != null) {
253                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(),
254                             domRpcResult.getResult());
255                 }
256                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
257             }
258
259             @Override
260             public void onFailure(final Throwable throwable) {
261                 recipient.tell(new Failure(throwable), getSelf());
262             }
263         }, MoreExecutors.directExecutor());
264     }
265
266     /**
267      * Invoking Action on Slave Node in Odl Cluster Environment.
268      *
269      * @param schemaPath {@link Absolute}
270      * @param containerNodeMessage {@link ContainerNodeMessage}
271      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
272      * @param recipient {@link ActorRef}
273      */
274     private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
275         final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
276         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
277             containerNodeMessage, domDataTreeIdentifier, deviceAction);
278
279         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
280             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
281
282         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
283
284             @Override
285             public void onSuccess(final DOMActionResult domActionResult) {
286                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
287                 if (domActionResult == null) {
288                     recipient.tell(new EmptyResultResponse(), getSender());
289                     return;
290                 }
291
292                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
293                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
294                         .orElse(null);
295                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
296             }
297
298             @Override
299             public void onFailure(final Throwable throwable) {
300                 recipient.tell(new Failure(throwable), getSelf());
301             }
302         }, MoreExecutors.directExecutor());
303     }
304
305     private void registerSlaveMountPoint(final ActorRef masterReference) {
306         unregisterSlaveMountPoint();
307
308         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
309
310         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
311     }
312
313     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
314         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
315                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
316         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
317                 getContext().dispatcher());
318
319         registeredSchemas = sourceIdentifiers.stream()
320                 .map(sourceId ->
321                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
322                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
323                 .collect(Collectors.toList());
324
325         return schemaRepository.createEffectiveModelContextFactory();
326     }
327
328     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
329             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
330         final ListenableFuture<EffectiveModelContext> schemaContextFuture =
331                 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
332         Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
333             @Override
334             public void onSuccess(final EffectiveModelContext result) {
335                 executeInSelf(() -> {
336                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
337                     // resolution.
338                     if (slaveSalManager == localSlaveSalManager) {
339                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
340                                 id, result.getModules());
341                         final var actorSystem = setup.getActorSystem();
342                         slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
343                             new ProxyDOMRpcService(actorSystem, masterReference, id, actorResponseWaitTime),
344                             new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
345                     }
346                 });
347             }
348
349             @Override
350             public void onFailure(final Throwable throwable) {
351                 executeInSelf(() -> {
352                     if (slaveSalManager == localSlaveSalManager) {
353                         final Throwable cause = Throwables.getRootCause(throwable);
354                         if (cause instanceof AskTimeoutException) {
355                             if (tries <= 5 || tries % 10 == 0) {
356                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
357                             }
358
359                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
360                                     masterReference, tries + 1);
361                         } else {
362                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
363                                     id, throwable);
364                             closeSchemaSourceRegistrations();
365                         }
366                     }
367                 });
368             }
369         }, MoreExecutors.directExecutor());
370     }
371
372     private void closeSchemaSourceRegistrations() {
373         if (registeredSchemas != null) {
374             registeredSchemas.forEach(SchemaSourceRegistration::close);
375             registeredSchemas = null;
376         }
377     }
378 }