Teach NETCONF about YANG 1.1 actions in cluster topology
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
25 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
26 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
28 import org.opendaylight.mdsal.dom.api.DOMActionResult;
29 import org.opendaylight.mdsal.dom.api.DOMActionService;
30 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
36 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
37 import org.opendaylight.mdsal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
39 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
41 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
42 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
44 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
45 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
46 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
47 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
48 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
49 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
50 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
52 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
53 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
55 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
56 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
57 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
58 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
59 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
60 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
66 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
67 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
69 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
70 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
71 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
72 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
73 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
74 import scala.concurrent.duration.Duration;
75
76 public class NetconfNodeActor extends AbstractUntypedActor {
77     private final Duration writeTxIdleTimeout;
78     private final DOMMountPointService mountPointService;
79
80     private SchemaSourceRegistry schemaRegistry;
81     private SchemaRepository schemaRepository;
82     private Timeout actorResponseWaitTime;
83     private RemoteDeviceId id;
84     private NetconfTopologySetup setup;
85     private List<SourceIdentifier> sourceIdentifiers;
86     private DOMRpcService deviceRpc;
87     private DOMActionService deviceAction;
88     private SlaveSalFacade slaveSalManager;
89     private DOMDataBroker deviceDataBroker;
90     //readTxActor can be shared
91     private ActorRef readTxActor;
92     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
93
94     public static Props props(final NetconfTopologySetup setup,
95                               final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
96                               final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
97                               final DOMMountPointService mountPointService) {
98         return Props.create(NetconfNodeActor.class, () ->
99                 new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository, actorResponseWaitTime,
100                         mountPointService));
101     }
102
103     protected NetconfNodeActor(final NetconfTopologySetup setup,
104                                final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
105                                final SchemaRepository schemaRepository, final Timeout actorResponseWaitTime,
106                                final DOMMountPointService mountPointService) {
107         this.setup = setup;
108         this.id = id;
109         this.schemaRegistry = schemaRegistry;
110         this.schemaRepository = schemaRepository;
111         this.actorResponseWaitTime = actorResponseWaitTime;
112         this.writeTxIdleTimeout = setup.getIdleTimeout();
113         this.mountPointService = mountPointService;
114     }
115
116     @SuppressWarnings("checkstyle:IllegalCatch")
117     @Override
118     public void handleReceive(final Object message) {
119         LOG.debug("{}:  received message {}", id, message);
120
121         if (message instanceof CreateInitialMasterActorData) { // master
122
123             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
124             sourceIdentifiers = masterActorData.getSourceIndentifiers();
125             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
126             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
127             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
128             this.deviceRpc = masterActorData.getDeviceRpc();
129             this.deviceAction = masterActorData.getDeviceAction();
130
131             sender().tell(new MasterActorDataInitialized(), self());
132
133             LOG.debug("{}: Master is ready.", id);
134
135         } else if (message instanceof  RefreshSetupMasterActorData) {
136             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
137             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
138             sender().tell(new MasterActorDataInitialized(), self());
139         } else if (message instanceof AskForMasterMountPoint) { // master
140             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
141
142             // only master contains reference to deviceDataBroker
143             if (deviceDataBroker != null) {
144                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
145                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
146                         sender());
147             } else {
148                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
149                 sender().tell(new Failure(new NotMasterException(self())), self());
150             }
151
152         } else if (message instanceof YangTextSchemaSourceRequest) { // master
153
154             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
155             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
156
157         } else if (message instanceof NewReadTransactionRequest) { // master
158             sender().tell(new Success(readTxActor), self());
159         } else if (message instanceof NewWriteTransactionRequest) { // master
160             try {
161                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
162                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
163                 sender().tell(new Success(txActor), self());
164             } catch (final Exception t) {
165                 sender().tell(new Failure(t), self());
166             }
167
168         } else if (message instanceof NewReadWriteTransactionRequest) {
169             try {
170                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
171                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
172                 sender().tell(new Success(txActor), self());
173             } catch (final Exception t) {
174                 sender().tell(new Failure(t), self());
175             }
176         } else if (message instanceof InvokeRpcMessage) { // master
177             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
178             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
179
180         } else if (message instanceof InvokeActionMessage) { // master
181             final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
182             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
183             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
184                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
185         } else if (message instanceof RegisterMountPoint) { //slaves
186             RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
187             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
188             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
189             sender().tell(new Success(null), self());
190         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
191             unregisterSlaveMountPoint();
192         } else if (message instanceof RefreshSlaveActor) { //slave
193             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
194             id = ((RefreshSlaveActor) message).getId();
195             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
196             setup = ((RefreshSlaveActor) message).getSetup();
197             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
198         }
199
200     }
201
202     @Override
203     public void postStop() throws Exception {
204         try {
205             super.postStop();
206         } finally {
207             unregisterSlaveMountPoint();
208         }
209     }
210
211     private void unregisterSlaveMountPoint() {
212         if (slaveSalManager != null) {
213             slaveSalManager.close();
214             slaveSalManager = null;
215         }
216
217         closeSchemaSourceRegistrations();
218     }
219
220     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
221         final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
222                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
223
224         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
225             @Override
226             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
227                 try {
228                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
229                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
230                 } catch (IOException e) {
231                     sender.tell(new Failure(e), getSelf());
232                 }
233             }
234
235             @Override
236             public void onFailure(final Throwable throwable) {
237                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
238                 sender.tell(new Failure(throwable), getSelf());
239             }
240         }, MoreExecutors.directExecutor());
241     }
242
243     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
244                                 final ActorRef recipient) {
245
246         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
247                 deviceRpc);
248
249         final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
250                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
251
252         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
253             @Override
254             public void onSuccess(final DOMRpcResult domRpcResult) {
255                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
256
257                 if (domRpcResult == null) {
258                     recipient.tell(new EmptyResultResponse(), getSender());
259                     return;
260                 }
261                 NormalizedNodeMessage nodeMessageReply = null;
262                 if (domRpcResult.getResult() != null) {
263                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
264                             domRpcResult.getResult());
265                 }
266                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
267             }
268
269             @Override
270             public void onFailure(final Throwable throwable) {
271                 recipient.tell(new Failure(throwable), getSelf());
272             }
273         }, MoreExecutors.directExecutor());
274     }
275
276     /**
277      * Invoking Action on Slave Node in Odl Cluster Environment.
278      *
279      * @param schemaPath {@link SchemaPath}
280      * @param containerNodeMessage {@link ContainerNodeMessage}
281      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
282      * @param recipient {@link ActorRef}
283      */
284     private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
285         final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
286         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
287             containerNodeMessage, domDataTreeIdentifier, deviceAction);
288
289         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
290             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
291
292         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
293
294             @Override
295             public void onSuccess(final DOMActionResult domActionResult) {
296                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
297                 if (domActionResult == null) {
298                     recipient.tell(new EmptyResultResponse(), getSender());
299                     return;
300                 }
301
302                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
303                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
304                     .orElse(null);
305                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
306             }
307
308             @Override
309             public void onFailure(final Throwable throwable) {
310                 recipient.tell(new Failure(throwable), getSelf());
311             }
312         }, MoreExecutors.directExecutor());
313     }
314
315     private void registerSlaveMountPoint(final ActorRef masterReference) {
316         unregisterSlaveMountPoint();
317
318         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
319
320         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
321     }
322
323     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
324         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
325     }
326
327     private DOMActionService getDOMActionService(final ActorRef masterReference) {
328         return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
329     }
330
331     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
332         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
333                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
334         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
335                 getContext().dispatcher());
336
337         registeredSchemas = sourceIdentifiers.stream()
338                 .map(sourceId ->
339                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
340                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
341                 .collect(Collectors.toList());
342
343         return schemaRepository.createEffectiveModelContextFactory();
344     }
345
346     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
347             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
348         final ListenableFuture<EffectiveModelContext> schemaContextFuture =
349                 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
350         Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
351             @Override
352             public void onSuccess(final SchemaContext result) {
353                 executeInSelf(() -> {
354                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
355                     // resolution.
356                     if (slaveSalManager == localSlaveSalManager) {
357                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
358                                 id, result.getModules());
359                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
360                             getDOMActionService(masterReference), masterReference);
361                     }
362                 });
363             }
364
365             @Override
366             public void onFailure(final Throwable throwable) {
367                 executeInSelf(() -> {
368                     if (slaveSalManager == localSlaveSalManager) {
369                         final Throwable cause = Throwables.getRootCause(throwable);
370                         if (cause instanceof AskTimeoutException) {
371                             if (tries <= 5 || tries % 10 == 0) {
372                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
373                             }
374
375                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
376                                     masterReference, tries + 1);
377                         } else {
378                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
379                                     id, throwable);
380                             closeSchemaSourceRegistrations();
381                         }
382                     }
383                 });
384             }
385         }, MoreExecutors.directExecutor());
386     }
387
388     private void closeSchemaSourceRegistrations() {
389         if (registeredSchemas != null) {
390             registeredSchemas.forEach(SchemaSourceRegistration::close);
391             registeredSchemas = null;
392         }
393     }
394 }