Update MRI projects for Aluminium
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.actors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status.Failure;
13 import akka.actor.Status.Success;
14 import akka.pattern.AskTimeoutException;
15 import akka.util.Timeout;
16 import com.google.common.base.Throwables;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
27 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
28 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
29 import org.opendaylight.mdsal.dom.api.DOMActionResult;
30 import org.opendaylight.mdsal.dom.api.DOMActionService;
31 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
35 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
38 import org.opendaylight.mdsal.dom.api.DOMRpcService;
39 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
40 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
41 import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
42 import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
43 import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
45 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
46 import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
47 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
48 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
49 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
50 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
51 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
52 import org.opendaylight.netconf.topology.singleton.messages.RefreshSlaveActor;
53 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
54 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
55 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
56 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
57 import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
58 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
59 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
60 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
61 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadTransactionRequest;
62 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewReadWriteTransactionRequest;
63 import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
66 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
67 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
68 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
69 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
70 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
71 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
72 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
73 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
74 import scala.concurrent.duration.Duration;
75
76 public class NetconfNodeActor extends AbstractUntypedActor {
77     private final Duration writeTxIdleTimeout;
78     private final DOMMountPointService mountPointService;
79
80     private SchemaSourceRegistry schemaRegistry;
81     private SchemaRepository schemaRepository;
82     private Timeout actorResponseWaitTime;
83     private RemoteDeviceId id;
84     private NetconfTopologySetup setup;
85     private List<SourceIdentifier> sourceIdentifiers;
86     private DOMRpcService deviceRpc;
87     private DOMActionService deviceAction;
88     private SlaveSalFacade slaveSalManager;
89     private DOMDataBroker deviceDataBroker;
90     //readTxActor can be shared
91     private ActorRef readTxActor;
92     private List<SchemaSourceRegistration<YangTextSchemaSource>> registeredSchemas;
93
94     public static Props props(final NetconfTopologySetup setup, final RemoteDeviceId id,
95             final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
96         return Props.create(NetconfNodeActor.class, () ->
97                 new NetconfNodeActor(setup, id, actorResponseWaitTime, mountPointService));
98     }
99
100     protected NetconfNodeActor(final NetconfTopologySetup setup,
101                                final RemoteDeviceId id, final Timeout actorResponseWaitTime,
102                                final DOMMountPointService mountPointService) {
103         this.setup = setup;
104         this.id = id;
105         this.schemaRegistry = setup.getSchemaResourcesDTO().getSchemaRegistry();
106         this.schemaRepository = setup.getSchemaResourcesDTO().getSchemaRepository();
107         this.actorResponseWaitTime = actorResponseWaitTime;
108         this.writeTxIdleTimeout = setup.getIdleTimeout();
109         this.mountPointService = mountPointService;
110     }
111
112     @SuppressWarnings("checkstyle:IllegalCatch")
113     @Override
114     public void handleReceive(final Object message) {
115         LOG.debug("{}:  received message {}", id, message);
116
117         if (message instanceof CreateInitialMasterActorData) { // master
118
119             final CreateInitialMasterActorData masterActorData = (CreateInitialMasterActorData) message;
120             sourceIdentifiers = masterActorData.getSourceIndentifiers();
121             this.deviceDataBroker = masterActorData.getDeviceDataBroker();
122             final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
123             readTxActor = context().actorOf(ReadTransactionActor.props(tx));
124             this.deviceRpc = masterActorData.getDeviceRpc();
125             this.deviceAction = masterActorData.getDeviceAction();
126
127             sender().tell(new MasterActorDataInitialized(), self());
128
129             LOG.debug("{}: Master is ready.", id);
130
131         } else if (message instanceof RefreshSetupMasterActorData) {
132             setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
133             id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
134             sender().tell(new MasterActorDataInitialized(), self());
135         } else if (message instanceof AskForMasterMountPoint) { // master
136             AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message;
137
138             // only master contains reference to deviceDataBroker
139             if (deviceDataBroker != null) {
140                 LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
141                 askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
142                     sender());
143             } else {
144                 LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
145                 sender().tell(new Failure(new NotMasterException(self())), self());
146             }
147
148         } else if (message instanceof YangTextSchemaSourceRequest) { // master
149
150             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
151             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
152
153         } else if (message instanceof NewReadTransactionRequest) { // master
154             sender().tell(new Success(readTxActor), self());
155         } else if (message instanceof NewWriteTransactionRequest) { // master
156             try {
157                 final DOMDataTreeWriteTransaction tx = deviceDataBroker.newWriteOnlyTransaction();
158                 final ActorRef txActor = context().actorOf(WriteTransactionActor.props(tx, writeTxIdleTimeout));
159                 sender().tell(new Success(txActor), self());
160             } catch (final Exception t) {
161                 sender().tell(new Failure(t), self());
162             }
163
164         } else if (message instanceof NewReadWriteTransactionRequest) {
165             try {
166                 final DOMDataTreeReadWriteTransaction tx = deviceDataBroker.newReadWriteTransaction();
167                 final ActorRef txActor = context().actorOf(ReadWriteTransactionActor.props(tx, writeTxIdleTimeout));
168                 sender().tell(new Success(txActor), self());
169             } catch (final Exception t) {
170                 sender().tell(new Failure(t), self());
171             }
172         } else if (message instanceof InvokeRpcMessage) { // master
173             final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
174             invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
175         } else if (message instanceof InvokeActionMessage) { // master
176             final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
177             LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
178             invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
179                 invokeActionMessage.getDOMDataTreeIdentifier(), sender());
180         } else if (message instanceof RegisterMountPoint) { //slaves
181             RegisterMountPoint registerMountPoint = (RegisterMountPoint) message;
182             sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
183             registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
184             sender().tell(new Success(null), self());
185         } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
186             unregisterSlaveMountPoint();
187         } else if (message instanceof RefreshSlaveActor) { //slave
188             actorResponseWaitTime = ((RefreshSlaveActor) message).getActorResponseWaitTime();
189             id = ((RefreshSlaveActor) message).getId();
190             schemaRegistry = ((RefreshSlaveActor) message).getSchemaRegistry();
191             setup = ((RefreshSlaveActor) message).getSetup();
192             schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
193         }
194     }
195
196     @Override
197     public void postStop() throws Exception {
198         try {
199             super.postStop();
200         } finally {
201             unregisterSlaveMountPoint();
202         }
203     }
204
205     private void unregisterSlaveMountPoint() {
206         if (slaveSalManager != null) {
207             slaveSalManager.close();
208             slaveSalManager = null;
209         }
210
211         closeSchemaSourceRegistrations();
212     }
213
214     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
215         final ListenableFuture<YangTextSchemaSource> schemaSourceFuture =
216                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
217
218         Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
219             @Override
220             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
221                 try {
222                     LOG.debug("{}: getSchemaSource for {} succeeded", id, sourceIdentifier);
223                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
224                 } catch (IOException e) {
225                     sender.tell(new Failure(e), getSelf());
226                 }
227             }
228
229             @Override
230             public void onFailure(final Throwable throwable) {
231                 LOG.debug("{}: getSchemaSource for {} failed", id, sourceIdentifier, throwable);
232                 sender.tell(new Failure(throwable), getSelf());
233             }
234         }, MoreExecutors.directExecutor());
235     }
236
237     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
238                                 final ActorRef recipient) {
239
240         LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
241                 deviceRpc);
242
243         final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
244                 normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
245
246         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
247             @Override
248             public void onSuccess(final DOMRpcResult domRpcResult) {
249                 LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
250
251                 if (domRpcResult == null) {
252                     recipient.tell(new EmptyResultResponse(), getSender());
253                     return;
254                 }
255                 NormalizedNodeMessage nodeMessageReply = null;
256                 if (domRpcResult.getResult() != null) {
257                     nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(),
258                             domRpcResult.getResult());
259                 }
260                 recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
261             }
262
263             @Override
264             public void onFailure(final Throwable throwable) {
265                 recipient.tell(new Failure(throwable), getSelf());
266             }
267         }, MoreExecutors.directExecutor());
268     }
269
270     /**
271      * Invoking Action on Slave Node in Odl Cluster Environment.
272      *
273      * @param schemaPath {@link SchemaPath}
274      * @param containerNodeMessage {@link ContainerNodeMessage}
275      * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
276      * @param recipient {@link ActorRef}
277      */
278     private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
279         final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
280         LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
281             containerNodeMessage, domDataTreeIdentifier, deviceAction);
282
283         final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
284             domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
285
286         Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
287
288             @Override
289             public void onSuccess(final DOMActionResult domActionResult) {
290                 LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
291                 if (domActionResult == null) {
292                     recipient.tell(new EmptyResultResponse(), getSender());
293                     return;
294                 }
295
296                 //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
297                 ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
298                         .orElse(null);
299                 recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
300             }
301
302             @Override
303             public void onFailure(final Throwable throwable) {
304                 recipient.tell(new Failure(throwable), getSelf());
305             }
306         }, MoreExecutors.directExecutor());
307     }
308
309     private void registerSlaveMountPoint(final ActorRef masterReference) {
310         unregisterSlaveMountPoint();
311
312         slaveSalManager = new SlaveSalFacade(id, setup.getActorSystem(), actorResponseWaitTime, mountPointService);
313
314         resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
315     }
316
317     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
318             justification = "https://github.com/spotbugs/spotbugs/issues/811")
319     private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
320         return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
321     }
322
323     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
324             justification = "https://github.com/spotbugs/spotbugs/issues/811")
325     private DOMActionService getDOMActionService(final ActorRef masterReference) {
326         return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
327     }
328
329     private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
330         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
331                 new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
332         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
333                 getContext().dispatcher());
334
335         registeredSchemas = sourceIdentifiers.stream()
336                 .map(sourceId ->
337                         schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
338                                 YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())))
339                 .collect(Collectors.toList());
340
341         return schemaRepository.createEffectiveModelContextFactory();
342     }
343
344     private void resolveSchemaContext(final EffectiveModelContextFactory schemaContextFactory,
345             final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
346         final ListenableFuture<EffectiveModelContext> schemaContextFuture =
347                 schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
348         Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
349             @Override
350             public void onSuccess(final EffectiveModelContext result) {
351                 executeInSelf(() -> {
352                     // Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
353                     // resolution.
354                     if (slaveSalManager == localSlaveSalManager) {
355                         LOG.info("{}: Schema context resolved: {} - registering slave mount point",
356                                 id, result.getModules());
357                         slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
358                             getDOMActionService(masterReference), masterReference);
359                     }
360                 });
361             }
362
363             @Override
364             public void onFailure(final Throwable throwable) {
365                 executeInSelf(() -> {
366                     if (slaveSalManager == localSlaveSalManager) {
367                         final Throwable cause = Throwables.getRootCause(throwable);
368                         if (cause instanceof AskTimeoutException) {
369                             if (tries <= 5 || tries % 10 == 0) {
370                                 LOG.warn("{}: Failed to resolve schema context - retrying...", id, throwable);
371                             }
372
373                             resolveSchemaContext(schemaContextFactory, localSlaveSalManager,
374                                     masterReference, tries + 1);
375                         } else {
376                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
377                                     id, throwable);
378                             closeSchemaSourceRegistrations();
379                         }
380                     }
381                 });
382             }
383         }, MoreExecutors.directExecutor());
384     }
385
386     private void closeSchemaSourceRegistrations() {
387         if (registeredSchemas != null) {
388             registeredSchemas.forEach(SchemaSourceRegistration::close);
389             registeredSchemas = null;
390         }
391     }
392 }