Bump upstreams to SNAPSHOTs
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.stream.Collectors;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
25 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
26 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
28 import org.opendaylight.mdsal.dom.api.DOMActionResult;
29 import org.opendaylight.mdsal.dom.api.DOMActionService;
30 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
36 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
37 import org.opendaylight.mdsal.dom.api.DOMRpcService;
38 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
39 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
41 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
42 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
43 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
45 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
46 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
47 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
48 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
49 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
50 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
53 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
55 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
57 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
58 import org.opendaylight.netconf.topology.singleton.messages.netconf.NetconfDataTreeServiceRequest;
59 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
60 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
64 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
65 import org.opendaylight.yangtools.yang.common.QName;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
68 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
69 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
70 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
71 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
72 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
73 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
74 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
75 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
76 import scala.concurrent.duration.Duration;
77
78 public class NetconfNodeActor extends AbstractUntypedActor {
79     private final Duration writeTxIdleTimeout;
80     private final DOMMountPointService mountPointService;
81
82     private SchemaSourceRegistry schemaRegistry;
83     private SchemaRepository schemaRepository;
84     private Timeout actorResponseWaitTime;
85     private RemoteDeviceId id;
86     private NetconfTopologySetup setup;
87     private List<SourceIdentifier> sourceIdentifiers;
88     private DOMRpcService deviceRpc;
89     private DOMActionService deviceAction;
90     private SlaveSalFacade slaveSalManager;
91     private DOMDataBroker deviceDataBroker;
92     private NetconfDataTreeService netconfService;
93     //readTxActor can be shared
94     private ActorRef readTxActor;
95     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
96
97     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
98             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
99         return Props.create(NetconfNodeActor.class, () ->
100                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
101     }
102
103     protected NetconfNodeActor(final NetconfTopologySetup setup,
104                                final RemoteDeviceId id, final Timeout actorResponseWaitTime,
105                                final DOMMountPointService mountPointService) {
106         this.setup = setup;
107         this.id = id;
108         this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
109         this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
110         this.actorResponseWaitTime = actorResponseWaitTime;
111         this.writeTxIdleTimeout = setup.getIdleTimeout();
112         this.mountPointService = mountPointService;
113     }
114
115     @SuppressWarnings("checkstyle:IllegalCatch")
116     @Override
117     public void handleReceive(final Object message) {
118         LOG.debug("{}:  received message {}", id, message);
119
120         if (message instanceof CreateInitialMasterActorData) { // master
121             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
122             sourceIdentifiers = masterActorData.getSourceIndentifiers();
123             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
124             this.netconfService = masterActorData.getNetconfDataTreeService();
125             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
126             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
127             this.deviceRpc = masterActorData.getDeviceRpc();
128             this.deviceAction = masterActorData.getDeviceAction();
129
130             sender().tell(new MasterActorDataInitialized(), self());
131             LOG.debug("{}: Master is ready.", id);
132         } else if (message instanceof RefreshSetupMasterActorData) {
133             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
134             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
135             sender().tell(new MasterActorDataInitialized(), self());
136         } else if (message instanceof AskForMasterMountPoint) { // master
137             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message;
138             // only master contains reference to deviceDataBroker
139             if (deviceDataBroker != null) {
140                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
141                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
142                     sender());
143             } else {
144                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
145                 sender().tell(new Failure(new NotMasterException(self())), self());
146             }
147         } else if (message instanceof YangTextSchemaSourceRequest) { // master
148             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
149             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
150         } else if (message instanceof NewReadTransactionRequest) { // master
151             sender().tell(new Success(readTxActor), self());
152         } else if (message instanceof NewWriteTransactionRequest) { // master
153             try {
154                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
155                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
156                 sender().tell(new Success(txActor), self());
157             } catch (final Exception t) {
158                 sender().tell(new Failure(t), self());
159             }
160         } else if (message instanceof NewReadWriteTransactionRequest) {
161             try {
162                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
163                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
164                 sender().tell(new Success(txActor), self());
165             } catch (final Exception t) {
166                 sender().tell(new Failure(t), self());
167             }
168         } else if (message instanceof InvokeRpcMessage) { // master
169             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
170             invokeSlaveRpc(invokeRpcMessage.getSchemaPath().lastNodeIdentifier(),
171                 invokeRpcMessage.getNormalizedNodeMessage(), sender());
172         } else if (message instanceof InvokeActionMessage) { // master
173             final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
174             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
175             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
176                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
177         } else if (message instanceof RegisterMountPoint) { //slaves
178             RegisterMountPoint registerMountPoint = (RegisterMountPoint) message;
179             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
180             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
181             sender().tell(new Success(null), self());
182         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
183             unregisterSlaveMountPoint();
184         } else if (message instanceof RefreshSlaveActor) { //slave
185             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
186             id = ((RefreshSlaveActor) message).getId();
187             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
188             setup = ((RefreshSlaveActor) message).getSetup();
189             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
190         } else if (message instanceof NetconfDataTreeServiceRequest) {
191             ActorRef netconfActor = context()
192                 .actorOf(NetconfDataTreeServiceActor.props(netconfService, writeTxIdleTimeout));
193             sender().tell(new Success(netconfActor), self());
194         }
195     }
196
197     @Override
198     public void postStop() throws Exception {
199         try {
200             super.postStop();
201         } finally {
202             unregisterSlaveMountPoint();
203         }
204     }
205
206     private void unregisterSlaveMountPoint() {
207         if (slaveSalManager != null) {
208             slaveSalManager.close();
209             slaveSalManager = null;
210         }
211
212         closeSchemaSourceRegistrations();
213     }
214
215     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
216         final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
217                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
218
219         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
220             @Override
221             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
222                 try {
223                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
224                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
225                 } catch (IOException e) {
226                     sender.tell(new Failure(e), getSelf());
227                 }
228             }
229
230             @Override
231             public void onFailure(final Throwable throwable) {
232                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
233                 sender.tell(new Failure(throwable), getSelf());
234             }
235         }, MoreExecutors.directExecutor());
236     }
237
238     private void invokeSlaveRpc(final QName qname, final NormalizedNodeMessage normalizedNodeMessage,
239                                 final ActorRef recipient) {
240
241         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, qname, normalizedNodeMessage,
242                 deviceRpc);
243
244         final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(qname,
245                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
246
247         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
248             @Override
249             public void onSuccess(final DOMRpcResult domRpcResult) {
250                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, qname, domRpcResult);
251
252                 if (domRpcResult == null) {
253                     recipient.tell(new EmptyResultResponse(), getSender());
254                     return;
255                 }
256                 NormalizedNodeMessage nodeMessageReply = null;
257                 if (domRpcResult.getResult() != null) {
258                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(),
259                             domRpcResult.getResult());
260                 }
261                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
262             }
263
264             @Override
265             public void onFailure(final Throwable throwable) {
266                 recipient.tell(new Failure(throwable), getSelf());
267             }
268         }, MoreExecutors.directExecutor());
269     }
270
271     /**
272      * Invoking Action on Slave Node in Odl Cluster Environment.
273      *
274      * @param schemaPath {@link Absolute}
275      * @param containerNodeMessage {@link ContainerNodeMessage}
276      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
277      * @param recipient {@link ActorRef}
278      */
279     private void invokeSlaveAction(final Absolute schemaPath, final ContainerNodeMessage containerNodeMessage,
280         final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
281         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
282             containerNodeMessage, domDataTreeIdentifier, deviceAction);
283
284         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
285             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
286
287         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
288
289             @Override
290             public void onSuccess(final DOMActionResult domActionResult) {
291                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
292                 if (domActionResult == null) {
293                     recipient.tell(new EmptyResultResponse(), getSender());
294                     return;
295                 }
296
297                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
298                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
299                         .orElse(null);
300                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
301             }
302
303             @Override
304             public void onFailure(final Throwable throwable) {
305                 recipient.tell(new Failure(throwable), getSelf());
306             }
307         }, MoreExecutors.directExecutor());
308     }
309
310     private void registerSlaveMountPoint(final ActorRef masterReference) {
311         unregisterSlaveMountPoint();
312
313         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
314
315         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
316     }
317
318     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
319         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
320     }
321
322     private DOMActionService getDOMActionService(final ActorRef masterReference) {
323         return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
324     }
325
326     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
327         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
328                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
329         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
330                 getContext().dispatcher());
331
332         registeredSchemas = sourceIdentifiers.stream()
333                 .map(sourceId ->
334                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
335                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
336                 .collect(Collectors.toList());
337
338         return schemaRepository.createEffectiveModelContextFactory();
339     }
340
341     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
342             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
343         final ListenableFuture<EffectiveModelContext> schemaContextFuture =
344                 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
345         Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
346             @Override
347             public void onSuccess(final EffectiveModelContext result) {
348                 executeInSelf(() -> {
349                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
350                     // resolution.
351                     if (slaveSalManager == localSlaveSalManager) {
352                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
353                                 id, result.getModules());
354                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
355                             getDOMActionService(masterReference), masterReference);
356                     }
357                 });
358             }
359
360             @Override
361             public void onFailure(final Throwable throwable) {
362                 executeInSelf(() -> {
363                     if (slaveSalManager == localSlaveSalManager) {
364                         final Throwable cause = Throwables.getRootCause(throwable);
365                         if (cause instanceof AskTimeoutException) {
366                             if (tries <= 5 || tries % 10 == 0) {
367                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
368                             }
369
370                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
371                                     masterReference, tries + 1);
372                         } else {
373                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
374                                     id, throwable);
375                             closeSchemaSourceRegistrations();
376                         }
377                     }
378                 });
379             }
380         }, MoreExecutors.directExecutor());
381     }
382
383     private void closeSchemaSourceRegistrations() {
384         if (registeredSchemas != null) {
385             registeredSchemas.forEach(SchemaSourceRegistration::close);
386             registeredSchemas = null;
387         }
388     }
389 }