Eliminate SchemaResourcesDTO
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
28 import org.opendaylight.mdsal.dom.api.DOMActionResult;
29 import org.opendaylight.mdsal.dom.api.DOMActionService;
30 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
36 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
37 import org.opendaylight.mdsal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
43 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
44 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
45 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
46 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
47 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
49 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
50 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
51 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
53 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
54 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
55 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
56 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
57 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
58 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
59 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
60 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
61 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
62 import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
64 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
65 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
66 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
67 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
68 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
69 import org.opendaylight.yangtools.concepts.Registration;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
73 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
74 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
75 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
76 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
77 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
78 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
79
80 public class NetconfNodeActor extends AbstractUntypedActor {
81     private final Duration writeTxIdleTimeout;
82     private final DOMMountPointService mountPointService;
83
84     private DeviceNetconfSchemaProvider schemaProvider;
85     private Timeout actorResponseWaitTime;
86     private RemoteDeviceId id;
87     private List<SourceIdentifier> sourceIdentifiers = null;
88     private DOMRpcService deviceRpc = null;
89     private DOMActionService deviceAction = null;
90     private SlaveSalFacade slaveSalManager;
91     private DOMDataBroker deviceDataBroker;
92     private NetconfDataTreeService netconfService;
93     //readTxActor can be shared
94     private ActorRef readTxActor;
95     private List<Registration> registeredSchemas;
96
97     protected NetconfNodeActor(final NetconfTopologySetup setup, final RemoteDeviceId id,
98             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
99         this.id = id;
100         schemaProvider = setup.getDeviceSchemaProvider();
101         this.actorResponseWaitTime = actorResponseWaitTime;
102         writeTxIdleTimeout = setup.getIdleTimeout();
103         this.mountPointService = mountPointService;
104     }
105
106     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
107             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
108         return Props.create(NetconfNodeActor.class, () ->
109                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
110     }
111
112     @SuppressWarnings("checkstyle:IllegalCatch")
113     @Override
114     public void handleReceive(final Object message) {
115         LOG.debug("{}:  received message {}", id, message);
116
117         if (message instanceof CreateInitialMasterActorData masterActorData) { // master
118             sourceIdentifiers = masterActorData.getSourceIndentifiers();
119             deviceDataBroker = masterActorData.getDeviceDataBroker();
120             netconfService = masterActorData.getNetconfDataTreeService();
121             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
122             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
123
124             final var deviceServices = masterActorData.getDeviceServices();
125             deviceRpc = deviceServices.rpcs() instanceof Rpcs.Normalized normalized ? normalized.domRpcService() : null;
126             deviceAction = deviceServices.actions() instanceof Actions.Normalized normalized ? normalized : null;
127
128             sender().tell(new MasterActorDataInitialized(), self());
129             LOG.debug("{}: Master is ready.", id);
130         } else if (message instanceof RefreshSetupMasterActorData masterActorData) {
131             id = masterActorData.getRemoteDeviceId();
132             schemaProvider = masterActorData.getNetconfTopologyDeviceSetup().getDeviceSchemaProvider();
133             sender().tell(new MasterActorDataInitialized(), self());
134         } else if (message instanceof AskForMasterMountPoint askForMasterMountPoint) { // master
135             // only master contains reference to deviceDataBroker
136             if (deviceDataBroker != null) {
137                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
138                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
139                     sender());
140             } else {
141                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
142                 sender().tell(new Failure(new NotMasterException(self())), self());
143             }
144         } else if (message instanceof YangTextSchemaSourceRequest yangTextSchemaSourceRequest) { // master
145             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
146         } else if (message instanceof NewReadTransactionRequest) { // master
147             sender().tell(new Success(readTxActor), self());
148         } else if (message instanceof NewWriteTransactionRequest) { // master
149             try {
150                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
151                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
152                 sender().tell(new Success(txActor), self());
153             } catch (final Exception t) {
154                 sender().tell(new Failure(t), self());
155             }
156         } else if (message instanceof NewReadWriteTransactionRequest) {
157             try {
158                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
159                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
160                 sender().tell(new Success(txActor), self());
161             } catch (final Exception t) {
162                 sender().tell(new Failure(t), self());
163             }
164         } else if (message instanceof InvokeRpcMessage invokeRpcMessage) { // master
165             invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
166                 invokeRpcMessage.getNormalizedNodeMessage(), sender());
167         } else if (message instanceof InvokeActionMessage invokeActionMessage) { // master
168             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
169             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
170                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
171         } else if (message instanceof RegisterMountPoint registerMountPoint) { //slaves
172             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
173             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
174             sender().tell(new Success(null), self());
175         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
176             unregisterSlaveMountPoint();
177         } else if (message instanceof RefreshSlaveActor refreshSlave) { //slave
178             actorResponseWaitTime = refreshSlave.getActorResponseWaitTime();
179             id = refreshSlave.getId();
180             schemaProvider = refreshSlave.getSetup().getDeviceSchemaProvider();
181         } else if (message instanceof NetconfDataTreeServiceRequest) {
182             final var netconfActor = context().actorOf(NetconfDataTreeServiceActor.props(netconfService,
183                 writeTxIdleTimeout));
184             sender().tell(new Success(netconfActor), self());
185         }
186     }
187
188     @Override
189     public void postStop() throws Exception {
190         try {
191             super.postStop();
192         } finally {
193             unregisterSlaveMountPoint();
194         }
195     }
196
197     private void unregisterSlaveMountPoint() {
198         if (slaveSalManager != null) {
199             slaveSalManager.close();
200             slaveSalManager = null;
201         }
202
203         closeSchemaSourceRegistrations();
204     }
205
206     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
207         Futures.addCallback(schemaProvider.repository().getSchemaSource(sourceIdentifier, YangTextSource.class),
208             new FutureCallback<>() {
209                 @Override
210                 public void onSuccess(final YangTextSource yangTextSchemaSource) {
211                     try {
212                         LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
213                         sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
214                     } catch (IOException e) {
215                         sender.tell(new Failure(e), getSelf());
216                     }
217                 }
218
219                 @Override
220                 public void onFailure(final Throwable throwable) {
221                     LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
222                     sender.tell(new Failure(throwable), getSelf());
223                 }
224             }, MoreExecutors.directExecutor());
225     }
226
227     private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
228                                 final ActorRef recipient) {
229         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
230                 deviceRpc);
231
232         final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
233                 normalizedNodeMessage != null ? (ContainerNode) normalizedNodeMessage.getNode() : null);
234
235         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
236             @Override
237             public void onSuccess(final DOMRpcResult domRpcResult) {
238                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
239
240                 if (domRpcResult == null) {
241                     recipient.tell(new EmptyResultResponse(), getSender());
242                     return;
243                 }
244                 NormalizedNodeMessage nodeMessageReply = null;
245                 if (domRpcResult.value() != null) {
246                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.of(), domRpcResult.value());
247                 }
248                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.errors()), getSelf());
249             }
250
251             @Override
252             public void onFailure(final Throwable throwable) {
253                 recipient.tell(new Failure(throwable), getSelf());
254             }
255         }, MoreExecutors.directExecutor());
256     }
257
258     /**
259      * Invoking Action on Slave Node in Odl Cluster Environment.
260      *
261      * @param schemaPath {@link Absolute}
262      * @param containerNodeMessage {@link ContainerNodeMessage}
263      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
264      * @param recipient {@link ActorRef}
265      */
266     private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
267             final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
268         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
269             containerNodeMessage, domDataTreeIdentifier, deviceAction);
270
271         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
272             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
273
274         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
275
276             @Override
277             public void onSuccess(final DOMActionResult domActionResult) {
278                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
279                 if (domActionResult == null) {
280                     recipient.tell(new EmptyResultResponse(), getSender());
281                     return;
282                 }
283
284                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
285                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
286                         .orElse(null);
287                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
288             }
289
290             @Override
291             public void onFailure(final Throwable throwable) {
292                 recipient.tell(new Failure(throwable), getSelf());
293             }
294         }, MoreExecutors.directExecutor());
295     }
296
297     private void registerSlaveMountPoint(final ActorRef masterReference) {
298         unregisterSlaveMountPoint();
299
300         slaveSalManager = new SlaveSalFacade(id, context().system(), actorResponseWaitTime, mountPointService);
301
302         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
303     }
304
305     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
306         final var dispatcher = getContext().dispatcher();
307         final var remoteYangTextSourceProvider = new ProxyYangTextSourceProvider(masterReference, dispatcher,
308             actorResponseWaitTime);
309         final var remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, dispatcher);
310         final var schemaRegistry = schemaProvider.registry();
311
312         registeredSchemas = sourceIdentifiers.stream()
313             .map(sourceId -> schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
314                 YangTextSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
315             .collect(Collectors.toList());
316
317         return schemaProvider.repository().createEffectiveModelContextFactory();
318     }
319
320     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
321             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
322         final var schemaContextFuture = schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
323         Futures.addCallback(schemaContextFuture, new FutureCallback<>() {
324             @Override
325             public void onSuccess(final EffectiveModelContext result) {
326                 executeInSelf(() -> {
327                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
328                     // resolution.
329                     if (slaveSalManager == localSlaveSalManager) {
330                         LOG.info("{}: Schema context resolved: {} - registering slave mount point", id,
331                             result.getModules());
332                         final var actorSystem = context().system();
333                         final var rpcProxy = new ProxyDOMRpcService(actorSystem, masterReference, id,
334                             actorResponseWaitTime);
335                         slaveSalManager.registerSlaveMountPoint(result, masterReference, new RemoteDeviceServices(
336                             (Rpcs.Normalized) () -> rpcProxy,
337                             new ProxyDOMActionService(actorSystem, masterReference, id, actorResponseWaitTime)));
338                     }
339                 });
340             }
341
342             @Override
343             public void onFailure(final Throwable throwable) {
344                 executeInSelf(() -> {
345                     if (slaveSalManager == localSlaveSalManager) {
346                         final Throwable cause = Throwables.getRootCause(throwable);
347                         if (cause instanceof AskTimeoutException) {
348                             if (tries <= 5 || tries % 10 == 0) {
349                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
350                             }
351
352                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
353                                     masterReference, tries + 1);
354                         } else {
355                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
356                                     id, throwable);
357                             closeSchemaSourceRegistrations();
358                         }
359                     }
360                 });
361             }
362         }, MoreExecutors.directExecutor());
363     }
364
365     private void closeSchemaSourceRegistrations() {
366         if (registeredSchemas != null) {
367             registeredSchemas.forEach(Registration::close);
368             registeredSchemas = null;
369         }
370     }
371 }